blob: 670cccfac4abf3dc7ca19d60877a71d55eac953c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <butil/strings/string_split.h>
#include <fmt/core.h>
#include <gen_cpp/cloud.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <cstdlib>
#include <memory>
#include <string>
#include <thread>
#include "common/defer.h"
#include "common/util.h"
#include "meta-service/meta_service.h"
#include "meta-store/codec.h"
#include "meta-store/document_message.h"
#include "meta-store/document_message_get_range.h"
#include "meta-store/keys.h"
#include "meta-store/mem_txn_kv.h"
#include "meta-store/meta_reader.h"
#include "meta-store/txn_kv.h"
#include "meta-store/txn_kv_error.h"
#include "meta-store/versioned_value.h"
#include "mock_accessor.h"
#include "rate-limiter/rate_limiter.h"
#include "recycler/checker.h"
#include "recycler/recycler.h"
#include "recycler/storage_vault_accessor.h"
#include "recycler/util.h"
using namespace doris;
using namespace doris::cloud;
extern doris::cloud::RecyclerThreadPoolGroup thread_group;
namespace {
constexpr int DATA_DISK_SIZE_CONST = 100;
constexpr int INDEX_DISK_SIZE_CONST = 10;
constexpr int DISK_SIZE_CONST = 110;
constexpr std::string_view RESOURCE_ID = "mock_resource_id";
std::unique_ptr<MetaServiceProxy> get_meta_service() {
int ret = 0;
// MemKv
auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>());
if (txn_kv != nullptr) {
ret = txn_kv->init();
[&] { ASSERT_EQ(ret, 0); }();
}
[&] { ASSERT_NE(txn_kv.get(), nullptr); }();
// FdbKv
// config::fdb_cluster_file_path = "fdb.cluster";
// static auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<FdbTxnKv>());
// static std::atomic<bool> init {false};
// bool tmp = false;
// if (init.compare_exchange_strong(tmp, true)) {
// int ret = txn_kv->init();
// [&] { ASSERT_EQ(ret, 0); ASSERT_NE(txn_kv.get(), nullptr); }();
// }
std::unique_ptr<Transaction> txn;
EXPECT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->remove("\x00", "\xfe"); // This is dangerous if the fdb is not correctly set
EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
auto rs = std::make_shared<ResourceManager>(txn_kv);
auto rl = std::make_shared<RateLimiter>();
auto snapshot = std::make_shared<SnapshotManager>(txn_kv);
auto meta_service = std::make_unique<MetaServiceImpl>(txn_kv, rs, rl, snapshot);
return std::make_unique<MetaServiceProxy>(std::move(meta_service));
}
// Create a instance and refresh the resource manager.
void create_and_refresh_instance(
MetaServiceProxy* service, std::string instance_id,
MultiVersionStatus multi_version_status = MultiVersionStatus::MULTI_VERSION_READ_WRITE) {
// write instance
InstanceInfoPB instance_info;
instance_info.set_instance_id(instance_id);
instance_info.set_multi_version_status(multi_version_status);
auto* obj_info = instance_info.mutable_obj_info()->Add();
obj_info->set_id(std::string(RESOURCE_ID));
obj_info->set_ak("mock_ak");
obj_info->set_sk("mock_sk");
obj_info->set_endpoint(config::test_s3_endpoint);
obj_info->set_region(config::test_s3_region);
obj_info->set_bucket(config::test_s3_bucket);
obj_info->set_prefix("");
std::unique_ptr<Transaction> txn;
ASSERT_EQ(service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->put(instance_key(instance_id), instance_info.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
service->resource_mgr()->refresh_instance(instance_id);
}
void prepare_and_commit_index(MetaServiceProxy* service, const std::string& cloud_unique_id,
int64_t db_id, int64_t table_id, int64_t index_id) {
IndexRequest request;
request.set_cloud_unique_id(cloud_unique_id);
request.set_db_id(db_id);
request.set_table_id(table_id);
request.add_index_ids(index_id);
IndexResponse response;
brpc::Controller cntl;
service->prepare_index(&cntl, &request, &response, nullptr);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(response.status().code(), MetaServiceCode::OK) << response.ShortDebugString();
// Commit index
service->commit_index(&cntl, &request, &response, nullptr);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(response.status().code(), MetaServiceCode::OK) << response.ShortDebugString();
}
void drop_index(MetaServiceProxy* service, const std::string& cloud_unique_id, int64_t db_id,
int64_t table_id, int64_t index_id) {
IndexRequest request;
request.set_cloud_unique_id(cloud_unique_id);
request.set_db_id(db_id);
request.set_table_id(table_id);
request.add_index_ids(index_id);
IndexResponse response;
brpc::Controller cntl;
service->drop_index(&cntl, &request, &response, nullptr);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(response.status().code(), MetaServiceCode::OK) << response.ShortDebugString();
}
void prepare_and_commit_partition(MetaServiceProxy* service, const std::string& cloud_unique_id,
int64_t db_id, int64_t table_id, int64_t partition_id,
int64_t index_id) {
PartitionRequest request;
request.set_cloud_unique_id(cloud_unique_id);
request.set_db_id(db_id);
request.set_table_id(table_id);
request.add_partition_ids(partition_id);
request.add_index_ids(index_id);
PartitionResponse response;
brpc::Controller cntl;
service->prepare_partition(&cntl, &request, &response, nullptr);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(response.status().code(), MetaServiceCode::OK) << response.ShortDebugString();
// Commit partition
service->commit_partition(&cntl, &request, &response, nullptr);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(response.status().code(), MetaServiceCode::OK) << response.ShortDebugString();
}
void drop_partition(MetaServiceProxy* service, const std::string& cloud_unique_id, int64_t db_id,
int64_t table_id, int64_t partition_id, int64_t index_id) {
PartitionRequest request;
request.set_cloud_unique_id(cloud_unique_id);
request.set_db_id(db_id);
request.set_table_id(table_id);
request.add_partition_ids(partition_id);
request.add_index_ids(index_id);
PartitionResponse response;
brpc::Controller cntl;
service->drop_partition(&cntl, &request, &response, nullptr);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(response.status().code(), MetaServiceCode::OK) << response.ShortDebugString();
}
std::string next_rowset_id() {
static int cnt = 0;
return std::to_string(++cnt);
}
void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t index_id, int64_t partition_id,
int64_t tablet_id, TabletStatePB state = TabletStatePB::PB_RUNNING) {
auto tablet = req.add_tablet_metas();
tablet->set_table_id(table_id);
tablet->set_index_id(index_id);
tablet->set_partition_id(partition_id);
tablet->set_tablet_id(tablet_id);
tablet->set_tablet_state(state);
auto schema = tablet->mutable_schema();
schema->set_schema_version(0);
auto first_rowset = tablet->add_rs_metas();
first_rowset->set_rowset_id(0); // required
first_rowset->set_rowset_id_v2(next_rowset_id());
first_rowset->set_tablet_id(tablet_id);
first_rowset->set_start_version(0);
first_rowset->set_end_version(1);
// Note: version 0-1 rowset has no resource_id and no actual data files,
// only KV metadata needs to be cleaned up during recycling.
first_rowset->mutable_tablet_schema()->CopyFrom(*schema);
}
void create_tablet(MetaServiceProxy* meta_service, const std::string& cloud_unique_id,
int64_t db_id, int64_t table_id, int64_t index_id, int64_t partition_id,
int64_t tablet_id, TabletStatePB state = TabletStatePB::PB_RUNNING) {
brpc::Controller cntl;
CreateTabletsRequest req;
CreateTabletsResponse res;
req.set_db_id(db_id);
req.set_cloud_unique_id(cloud_unique_id);
add_tablet(req, table_id, index_id, partition_id, tablet_id, state);
meta_service->create_tablets(&cntl, &req, &res, nullptr);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
}
void begin_txn(MetaServiceProxy* meta_service, const std::string& cloud_unique_id, int64_t db_id,
const std::string& label, int64_t table_id, int64_t& txn_id) {
brpc::Controller cntl;
BeginTxnRequest req;
BeginTxnResponse res;
req.set_cloud_unique_id(cloud_unique_id);
auto txn_info = req.mutable_txn_info();
txn_info->set_db_id(db_id);
txn_info->set_label(label);
txn_info->add_table_ids(table_id);
txn_info->set_timeout_ms(36000);
meta_service->begin_txn(&cntl, &req, &res, nullptr);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label << ", " << res.ShortDebugString();
ASSERT_TRUE(res.has_txn_id()) << label;
txn_id = res.txn_id();
}
void commit_txn(MetaServiceProxy* meta_service, const std::string& cloud_unique_id, int64_t db_id,
int64_t txn_id, const std::string& label) {
brpc::Controller cntl;
CommitTxnRequest req;
CommitTxnResponse res;
req.set_cloud_unique_id(cloud_unique_id);
req.set_db_id(db_id);
req.set_txn_id(txn_id);
meta_service->commit_txn(&cntl, &req, &res, nullptr);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
}
doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int partition_id = 10,
int64_t version = -1, int num_rows = 100) {
doris::RowsetMetaCloudPB rowset;
rowset.set_rowset_id(0); // required
rowset.set_rowset_id_v2(next_rowset_id());
rowset.set_tablet_id(tablet_id);
rowset.set_partition_id(partition_id);
rowset.set_txn_id(txn_id);
if (version > 0) {
rowset.set_start_version(version);
rowset.set_end_version(version);
}
rowset.set_resource_id(std::string(RESOURCE_ID));
rowset.set_num_segments(1);
rowset.set_num_rows(num_rows);
rowset.set_data_disk_size(num_rows * DATA_DISK_SIZE_CONST);
rowset.set_index_disk_size(num_rows * INDEX_DISK_SIZE_CONST);
rowset.set_total_disk_size(num_rows * DISK_SIZE_CONST);
rowset.mutable_tablet_schema()->set_schema_version(0);
rowset.set_txn_expiration(::time(nullptr)); // Required by DCHECK
return rowset;
}
void prepare_rowset(MetaServiceProxy* meta_service, const std::string& cloud_unique_id,
const doris::RowsetMetaCloudPB& rowset) {
brpc::Controller cntl;
CreateRowsetRequest req;
CreateRowsetResponse resp;
req.set_cloud_unique_id(cloud_unique_id);
req.mutable_rowset_meta()->CopyFrom(rowset);
meta_service->prepare_rowset(&cntl, &req, &resp, nullptr);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(resp.status().code(), MetaServiceCode::OK) << rowset.ShortDebugString();
}
void commit_rowset(MetaServiceProxy* meta_service, const std::string& cloud_unique_id,
const doris::RowsetMetaCloudPB& rowset) {
brpc::Controller cntl;
CreateRowsetRequest req;
CreateRowsetResponse resp;
req.set_cloud_unique_id(cloud_unique_id);
req.mutable_rowset_meta()->CopyFrom(rowset);
meta_service->commit_rowset(&cntl, &req, &resp, nullptr);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(resp.status().code(), MetaServiceCode::OK) << rowset.ShortDebugString();
}
void insert_rowset(MetaServiceProxy* meta_service, const std::string& cloud_unique_id,
int64_t db_id, const std::string& label, int64_t table_id, int64_t partition_id,
int64_t tablet_id, std::string* rowset_id = nullptr,
StorageVaultAccessor* accessor = nullptr) {
int64_t txn_id = 0;
ASSERT_NO_FATAL_FAILURE(
begin_txn(meta_service, cloud_unique_id, db_id, label, table_id, txn_id));
auto rowset = create_rowset(txn_id, tablet_id, partition_id);
if (rowset_id) {
*rowset_id = rowset.rowset_id_v2();
}
ASSERT_NO_FATAL_FAILURE(prepare_rowset(meta_service, cloud_unique_id, rowset));
ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service, cloud_unique_id, rowset));
ASSERT_NO_FATAL_FAILURE(commit_txn(meta_service, cloud_unique_id, db_id, txn_id, label));
if (accessor) {
for (int i = 0; i < 1; ++i) {
auto path = doris::cloud::segment_path(rowset.tablet_id(), rowset.rowset_id_v2(), i);
accessor->put_file(path, "");
}
}
}
void insert_rowsets(MetaServiceProxy* meta_service, const std::string& cloud_unique_id,
int64_t db_id, const std::string& label, int64_t table_id, int64_t partition_id,
std::vector<int64_t> tablet_ids) {
int64_t txn_id = 0;
ASSERT_NO_FATAL_FAILURE(
begin_txn(meta_service, cloud_unique_id, db_id, label, table_id, txn_id));
for (auto tablet_id : tablet_ids) {
auto rowset = create_rowset(txn_id, tablet_id, partition_id);
ASSERT_NO_FATAL_FAILURE(prepare_rowset(meta_service, cloud_unique_id, rowset));
ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service, cloud_unique_id, rowset));
}
ASSERT_NO_FATAL_FAILURE(commit_txn(meta_service, cloud_unique_id, db_id, txn_id, label));
}
void get_tablet_stats(MetaService* meta_service, const std::string& cloud_unique_id,
int64_t tablet_id, TabletStatsPB& stats) {
brpc::Controller cntl;
GetTabletStatsRequest req;
GetTabletStatsResponse res;
req.set_cloud_unique_id(cloud_unique_id);
auto idx = req.add_tablet_idx();
idx->set_tablet_id(tablet_id);
meta_service->get_tablet_stats(&cntl, &req, &res, nullptr);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
stats = res.tablet_stats(0);
}
void start_compaction_job(MetaService* meta_service, const std::string& cloud_unique_id,
int64_t tablet_id, const std::string& job_id,
const std::string& initiator, int base_compaction_cnt,
int cumu_compaction_cnt, TabletCompactionJobPB::CompactionType type,
std::pair<int64_t, int64_t> input_version = {0, 0}) {
brpc::Controller cntl;
StartTabletJobRequest req;
StartTabletJobResponse res;
req.set_cloud_unique_id(cloud_unique_id);
req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id);
auto compaction = req.mutable_job()->add_compaction();
compaction->set_id(job_id);
compaction->set_initiator(initiator);
compaction->set_base_compaction_cnt(base_compaction_cnt);
compaction->set_cumulative_compaction_cnt(cumu_compaction_cnt);
compaction->set_type(type);
long now = time(nullptr);
compaction->set_expiration(now + 12);
compaction->set_lease(now + 3);
if (input_version.second > 0) {
compaction->add_input_versions(input_version.first);
compaction->add_input_versions(input_version.second);
compaction->set_check_input_versions_range(true);
}
meta_service->start_tablet_job(&cntl, &req, &res, nullptr);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.ShortDebugString();
}
void finish_compaction_job(MetaService* meta_service, const std::string& cloud_unique_id,
int64_t tablet_id, const std::string& job_id,
const std::string& initiator, int base_compaction_cnt,
int cumu_compaction_cnt, TabletCompactionJobPB::CompactionType type,
const std::string& output_rowset_id,
std::pair<int64_t, int64_t> input_version, int64_t txn_id) {
brpc::Controller cntl;
FinishTabletJobRequest req;
FinishTabletJobResponse res;
req.set_cloud_unique_id(cloud_unique_id);
req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id);
auto compaction = req.mutable_job()->add_compaction();
compaction->set_id(job_id);
compaction->set_initiator(initiator);
compaction->set_base_compaction_cnt(base_compaction_cnt);
compaction->set_cumulative_compaction_cnt(cumu_compaction_cnt);
compaction->set_type(type);
long now = time(nullptr);
compaction->set_expiration(now + 12);
compaction->set_lease(now + 3);
compaction->add_input_versions(input_version.first);
compaction->add_input_versions(input_version.second);
compaction->add_output_versions(input_version.second); // [first, second]
compaction->set_check_input_versions_range(true);
compaction->add_output_rowset_ids(output_rowset_id);
compaction->add_txn_id(txn_id);
req.set_action(FinishTabletJobRequest::COMMIT);
meta_service->finish_tablet_job(&cntl, &req, &res, nullptr);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.ShortDebugString();
}
void compact_rowsets_cumulative(MetaServiceProxy* meta_service, const std::string& cloud_unique_id,
int64_t db_id, const std::string& label, int64_t table_id,
int64_t partition_id, int64_t tablet_id, int64_t start_version,
int64_t end_version, int num_rows) {
TabletStatsPB stats;
ASSERT_NO_FATAL_FAILURE(get_tablet_stats(meta_service, cloud_unique_id, tablet_id, stats));
int base_compaction_cnt = stats.base_compaction_cnt();
int cumu_compaction_cnt = stats.cumulative_compaction_cnt();
std::string job_id = fmt::format("compaction_{}_{}_{}", tablet_id, start_version, end_version);
ASSERT_NO_FATAL_FAILURE(start_compaction_job(
meta_service, cloud_unique_id, tablet_id, job_id, "test_case", base_compaction_cnt,
cumu_compaction_cnt, TabletCompactionJobPB::CUMULATIVE, {start_version, end_version}));
int64_t txn_id = 123321;
doris::RowsetMetaCloudPB compact_rowset =
create_rowset(txn_id, tablet_id, partition_id, start_version, num_rows);
std::string output_rowset_id = compact_rowset.rowset_id_v2();
compact_rowset.set_end_version(end_version);
ASSERT_NO_FATAL_FAILURE(prepare_rowset(meta_service, cloud_unique_id, compact_rowset));
ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service, cloud_unique_id, compact_rowset));
ASSERT_NO_FATAL_FAILURE(finish_compaction_job(
meta_service, cloud_unique_id, tablet_id, job_id, "test_case", base_compaction_cnt,
cumu_compaction_cnt, TabletCompactionJobPB::CUMULATIVE, output_rowset_id,
{start_version, end_version}, txn_id));
}
void start_schema_change_job(MetaServiceProxy* meta_service, const std::string& cloud_unique_id,
int64_t table_id, int64_t index_id, int64_t partition_id,
int64_t tablet_id, int64_t new_tablet_id, const std::string& job_id,
const std::string& initiator, int64_t alter_version = -1) {
brpc::Controller cntl;
StartTabletJobRequest req;
StartTabletJobResponse res;
req.set_cloud_unique_id(cloud_unique_id);
req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id);
auto sc = req.mutable_job()->mutable_schema_change();
sc->set_id(job_id);
sc->set_initiator(initiator);
sc->mutable_new_tablet_idx()->set_tablet_id(new_tablet_id);
if (alter_version != -1) {
sc->set_alter_version(alter_version);
}
long now = time(nullptr);
sc->set_expiration(now + 12);
meta_service->start_tablet_job(&cntl, &req, &res, nullptr);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(res.status().code(), MetaServiceCode::OK)
<< job_id << ' ' << initiator << ' ' << res.status().msg();
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK)
<< job_id << ' ' << initiator;
};
void finish_schema_change_job(MetaService* meta_service, const std::string& cloud_unique_id,
int64_t tablet_id, int64_t new_tablet_id, const std::string& job_id,
const std::string& initiator,
const std::vector<doris::RowsetMetaCloudPB>& output_rowsets,
FinishTabletJobRequest_Action action = FinishTabletJobRequest::COMMIT,
int64_t delete_bitmap_lock_initiator = 12345) {
brpc::Controller cntl;
FinishTabletJobRequest req;
FinishTabletJobResponse res;
req.set_cloud_unique_id(cloud_unique_id);
req.set_action(action);
req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id);
auto sc = req.mutable_job()->mutable_schema_change();
sc->mutable_new_tablet_idx()->set_tablet_id(new_tablet_id);
if (output_rowsets.empty()) {
sc->set_alter_version(0);
} else {
sc->set_alter_version(output_rowsets.back().end_version());
for (auto& rowset : output_rowsets) {
sc->add_txn_ids(rowset.txn_id());
sc->add_output_versions(rowset.end_version());
sc->set_num_output_rows(sc->num_output_rows() + rowset.num_rows());
sc->set_num_output_segments(sc->num_output_segments() + rowset.num_segments());
sc->set_size_output_rowsets(sc->size_output_rowsets() + rowset.total_disk_size());
sc->set_index_size_output_rowsets(sc->index_size_output_rowsets() +
rowset.index_disk_size());
sc->set_segment_size_output_rowsets(sc->segment_size_output_rowsets() +
rowset.data_disk_size());
}
sc->set_num_output_rowsets(output_rowsets.size());
}
sc->set_id(job_id);
sc->set_initiator(initiator);
sc->set_delete_bitmap_lock_initiator(delete_bitmap_lock_initiator);
meta_service->finish_tablet_job(&cntl, &req, &res, nullptr);
}
void get_rowsets(MetaServiceProxy* meta_service, const std::string& cloud_unique_id,
int64_t tablet_id, int64_t start_version, int64_t end_version,
std::vector<doris::RowsetMetaCloudPB>& rowsets) {
TabletStatsPB stats;
ASSERT_NO_FATAL_FAILURE(get_tablet_stats(meta_service, cloud_unique_id, tablet_id, stats));
brpc::Controller cntl;
GetRowsetRequest req;
GetRowsetResponse res;
req.set_cloud_unique_id(cloud_unique_id);
req.set_base_compaction_cnt(stats.base_compaction_cnt());
req.set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
if (stats.has_full_compaction_cnt()) {
req.set_full_compaction_cnt(stats.full_compaction_cnt());
}
req.set_cumulative_point(stats.cumulative_point());
req.set_start_version(start_version);
req.set_end_version(end_version);
req.mutable_idx()->set_tablet_id(tablet_id);
meta_service->get_rowset(&cntl, &req, &res, nullptr);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.ShortDebugString();
for (int i = 0; i < res.rowset_meta_size(); i++) {
rowsets.push_back(res.rowset_meta(i));
}
}
void get_instance(MetaServiceProxy* meta_service, const std::string& cloud_unique_id,
InstanceInfoPB& instance_info) {
brpc::Controller cntl;
GetInstanceRequest req;
GetInstanceResponse res;
req.set_cloud_unique_id(cloud_unique_id);
meta_service->get_instance(&cntl, &req, &res, nullptr);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.ShortDebugString();
ASSERT_TRUE(res.has_instance()) << res.ShortDebugString();
instance_info.CopyFrom(res.instance());
}
void drop_instance(MetaServiceProxy* meta_service, const std::string& instance_id) {
brpc::Controller cntl;
AlterInstanceRequest req;
AlterInstanceResponse res;
req.set_instance_id(instance_id);
req.set_op(AlterInstanceRequest::DROP);
meta_service->alter_instance(&cntl, &req, &res, nullptr);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.ShortDebugString();
}
std::unique_ptr<InstanceRecycler> get_instance_recycler(
MetaServiceProxy* meta_service, const InstanceInfoPB& instance_info,
std::shared_ptr<StorageVaultAccessor> accessor) {
auto txn_lazy_committer = std::make_shared<TxnLazyCommitter>(meta_service->txn_kv());
auto recycler = std::make_unique<InstanceRecycler>(meta_service->txn_kv(), instance_info,
thread_group, txn_lazy_committer);
recycler->TEST_add_accessor(RESOURCE_ID, accessor);
return recycler;
}
std::unique_ptr<InstanceRecycler> get_instance_recycler(MetaServiceProxy* meta_service,
const InstanceInfoPB& instance_info) {
std::shared_ptr<StorageVaultAccessor> accessor = std::make_shared<MockAccessor>();
return get_instance_recycler(meta_service, instance_info, accessor);
}
// Convert a string to a hex-escaped string.
// A non-displayed character is represented as \xHH where HH is the hexadecimal value of the character.
// A displayed character is represented as itself.
std::string escape_hex(std::string_view data) {
std::string result;
for (char c : data) {
if (isprint(c)) {
result += c;
} else {
result += fmt::format("\\x{:02x}", static_cast<unsigned char>(c));
}
}
return result;
}
size_t count_range(TxnKv* txn_kv, std::string_view begin = "", std::string_view end = "\xFF") {
std::unique_ptr<Transaction> txn;
EXPECT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
if (!txn) {
return 0; // Failed to create transaction
}
FullRangeGetOptions opts;
opts.txn = txn.get();
auto iter = txn_kv->full_range_get(std::string(begin), std::string(end), std::move(opts));
size_t total = 0;
for (auto&& kvp = iter->next(); kvp.has_value(); kvp = iter->next()) {
total += 1;
}
EXPECT_TRUE(iter->is_valid()); // The iterator should still be valid after the next call.
return total;
}
std::string dump_range(TxnKv* txn_kv, std::string_view begin = "", std::string_view end = "\xFF") {
std::unique_ptr<Transaction> txn;
if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) {
return "Failed to create dump range transaction";
}
FullRangeGetOptions opts;
opts.txn = txn.get();
auto iter = txn_kv->full_range_get(std::string(begin), std::string(end), std::move(opts));
std::string buffer;
for (auto&& kv = iter->next(); kv.has_value(); kv = iter->next()) {
buffer += fmt::format("Key: {}, Value: {}, KeyHex: {}\n", escape_hex(kv->first),
escape_hex(kv->second), hex(kv->first));
}
EXPECT_TRUE(iter->is_valid()); // The iterator should still be valid after the next call.
return buffer;
}
void check_rowset_meta(TxnKv* txn_kv, std::string_view instance_id, int64_t tablet_id,
int64_t start_version, int64_t end_version) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
// 1. there is no overlap
std::map<int64_t, RowsetMetaCloudPB> rowset_graph;
{
std::string start_key =
versioned::meta_rowset_load_key({instance_id, tablet_id, start_version});
std::string end_key =
versioned::meta_rowset_load_key({instance_id, tablet_id, end_version});
// [start, end]
versioned::ReadDocumentMessagesOptions options;
options.exclude_begin_key = false;
options.exclude_end_key = false;
auto iter = versioned::document_get_range<RowsetMetaCloudPB>(txn.get(), start_key, end_key,
options);
for (auto&& kvp = iter->next(); kvp.has_value(); kvp = iter->next()) {
auto&& [key, version, rowset_meta] = *kvp;
auto result = rowset_graph.emplace(rowset_meta.end_version(), std::move(rowset_meta));
EXPECT_TRUE(result.second) << "insert load rowset meta: " << rowset_meta.end_version();
}
ASSERT_TRUE(iter->is_valid())
<< "failed to get loaded rowset metas, error_code=" << iter->error_code();
}
{
std::string start_key =
versioned::meta_rowset_compact_key({instance_id, tablet_id, start_version});
std::string end_key =
versioned::meta_rowset_compact_key({instance_id, tablet_id, end_version});
// [start, end]
versioned::ReadDocumentMessagesOptions options;
options.exclude_begin_key = false;
options.exclude_end_key = false;
int64_t last_start_version = std::numeric_limits<int64_t>::max();
auto iter = versioned::document_get_range<RowsetMetaCloudPB>(txn.get(), start_key, end_key,
options);
for (auto&& kvp = iter->next(); kvp.has_value(); kvp = iter->next()) {
auto&& [key, version, rowset_meta] = *kvp;
int64_t start_version = rowset_meta.start_version();
int64_t end_version = rowset_meta.end_version();
EXPECT_GT(last_start_version, start_version)
<< "This compact rowset has been covered by a large compact rowset";
last_start_version = start_version;
// erase the rowsets that are covered by this compact rowset
auto lower = rowset_graph.lower_bound(start_version);
auto upper = rowset_graph.upper_bound(end_version);
EXPECT_EQ(lower, upper)
<< "There is overlap between rowsets, start_version=" << start_version
<< ", end_version=" << end_version;
rowset_graph.erase(lower, upper);
auto result = rowset_graph.emplace(end_version, std::move(rowset_meta));
EXPECT_TRUE(result.second) << "insert load rowset meta: " << rowset_meta.end_version();
}
ASSERT_TRUE(iter->is_valid())
<< "failed to get compact rowset metas, error_code=" << iter->error_code();
}
// 2. there is no hole
int64_t expect_version = start_version;
for (auto&& [end_version, rowset_meta] : rowset_graph) {
EXPECT_EQ(rowset_meta.start_version(), expect_version)
<< "There is a hole between rowsets, expected start_version=" << expect_version
<< ", but got " << rowset_meta.start_version();
expect_version = end_version + 1;
}
// 3. the data ref count is correct
for (auto&& [end_version, rowset_meta] : rowset_graph) {
std::string data_reference_key = versioned::data_rowset_ref_count_key(
{instance_id, tablet_id, rowset_meta.rowset_id_v2()});
std::string data_reference_value;
auto rc = txn->get(data_reference_key, &data_reference_value);
ASSERT_EQ(rc, TxnErrorCode::TXN_OK)
<< "failed to get data reference key: " << data_reference_key;
}
}
void check_no_specified_rowset_meta(TxnKv* txn_kv, std::string_view instance_id, int64_t tablet_id,
const std::vector<std::string>& rowset_ids) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
for (auto rowset_id : rowset_ids) {
std::string data_reference_key =
versioned::data_rowset_ref_count_key({instance_id, tablet_id, rowset_id});
std::string data_reference_value;
auto rc = txn->get(data_reference_key, &data_reference_value);
ASSERT_EQ(rc, TxnErrorCode::TXN_KEY_NOT_FOUND)
<< "failed to get data reference key: " << data_reference_key
<< ", value=" << hex(data_reference_value);
}
}
void versioned_get_all(TxnKv* txn_kv, std::string key,
std::vector<std::pair<std::string, Versionstamp>>& values) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string begin_key = encode_versioned_key(key, Versionstamp::min());
std::string end_key = encode_versioned_key(key, Versionstamp::max());
FullRangeGetOptions opts;
auto iter = txn->full_range_get(begin_key, end_key, opts);
for (auto&& kvp = iter->next(); kvp.has_value(); kvp = iter->next()) {
auto&& [key, value] = *kvp;
std::string_view key_part(key);
Versionstamp version;
EXPECT_EQ(decode_tailing_versionstamp_end(&key_part), 0);
EXPECT_EQ(decode_tailing_versionstamp(&key_part, &version), 0);
values.emplace_back(value, version);
}
ASSERT_TRUE(iter->is_valid()) << "failed to get all versioned values, error_code="
<< iter->error_code();
}
void check_partition_resources(TxnKv* txn_kv, std::string_view instance_id, int64_t table_id,
int64_t index_id, int64_t partition_id) {
// 1. check table version
std::vector<std::pair<std::string, Versionstamp>> values;
std::string table_version_key = versioned::table_version_key({instance_id, table_id});
ASSERT_NO_FATAL_FAILURE(versioned_get_all(txn_kv, table_version_key, values));
ASSERT_EQ(values.size(), 1) << "There should be only one table version";
// 2. check partition version
values.clear();
std::string partition_version_key =
versioned::partition_version_key({instance_id, partition_id});
ASSERT_NO_FATAL_FAILURE(versioned_get_all(txn_kv, partition_version_key, values));
ASSERT_EQ(values.size(), 1) << "There should be only one partition version";
// 3. check index meta
values.clear();
std::string index_meta_key = versioned::meta_index_key({instance_id, index_id});
ASSERT_NO_FATAL_FAILURE(versioned_get_all(txn_kv, index_meta_key, values));
ASSERT_EQ(values.size(), 1) << "There should be only one index meta";
// 4. check partition meta
values.clear();
std::string partition_meta_key = versioned::meta_partition_key({instance_id, partition_id});
ASSERT_NO_FATAL_FAILURE(versioned_get_all(txn_kv, partition_meta_key, values));
ASSERT_EQ(values.size(), 1) << "There should be only one partition meta";
}
void check_tablet_meta(TxnKv* txn_kv, std::string_view instance_id, int64_t tablet_id) {
// 1. check tablet meta
std::vector<std::pair<std::string, Versionstamp>> values;
std::string tablet_meta_key = versioned::meta_tablet_key({instance_id, tablet_id});
ASSERT_NO_FATAL_FAILURE(versioned_get_all(txn_kv, tablet_meta_key, values));
ASSERT_EQ(values.size(), 1) << "There should be only one tablet meta, " << dump_range(txn_kv);
// 2. check tablet load stats
values.clear();
std::string tablet_stats_key = versioned::tablet_load_stats_key({instance_id, tablet_id});
ASSERT_NO_FATAL_FAILURE(versioned_get_all(txn_kv, tablet_stats_key, values));
ASSERT_EQ(values.size(), 1) << "There should be only one load stats key";
// 3. check tablet compaction stats
values.clear();
std::string tablet_compaction_stats_key =
versioned::tablet_compact_stats_key({instance_id, tablet_id});
ASSERT_NO_FATAL_FAILURE(versioned_get_all(txn_kv, tablet_compaction_stats_key, values));
ASSERT_EQ(values.size(), 1) << "There should be only one compaction stats key";
}
} // namespace
// A test that simulates a compaction operation and recycles the input rowsets.
TEST(RecycleVersionedKeysTest, RecycleRowset_Compaction) {
auto meta_service = get_meta_service();
auto txn_kv = meta_service->txn_kv();
std::string instance_id = "recycle_rowset_test_instance";
std::string cloud_unique_id = fmt::format("1:{}:0", instance_id);
ASSERT_NO_FATAL_FAILURE(create_and_refresh_instance(meta_service.get(), instance_id));
int64_t db_id = 1, table_id = 2, index_id = 3, partition_id = 4, tablet_id = 5;
{
// create partition/index/tablet
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_index(meta_service.get(), cloud_unique_id, db_id,
table_id, index_id));
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_partition(
meta_service.get(), cloud_unique_id, db_id, table_id, partition_id, index_id));
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), cloud_unique_id, db_id, table_id,
index_id, partition_id, tablet_id));
}
size_t num_input_rowsets = 3;
std::vector<int64_t> input_versions;
std::vector<std::string> input_rowset_ids;
{
// insert some rowsets
for (size_t i = 0; i < num_input_rowsets; ++i) {
std::string rowset_id;
ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), cloud_unique_id, db_id,
fmt::format("label_{}", i), table_id,
partition_id, tablet_id, &rowset_id));
input_versions.push_back(i + 2); // versions start from 2
input_rowset_ids.push_back(rowset_id);
}
// insert other rowsets
for (size_t i = 0; i < 2; ++i) {
ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), cloud_unique_id, db_id,
fmt::format("other_label_{}", i), table_id,
partition_id, tablet_id));
}
}
int64_t start_version = input_versions.front();
int64_t end_version = input_versions.back();
int64_t last_version = end_version + 2;
{
// simulate a compaction operation and recycle the input rowsets
ASSERT_NO_FATAL_FAILURE(compact_rowsets_cumulative(
meta_service.get(), cloud_unique_id, db_id, "compaction_label", table_id,
partition_id, tablet_id, start_version, end_version,
300)); // num_rows for output rowset
}
{
// recycle the rowsets
config::force_immediate_recycle = true;
DORIS_CLOUD_DEFER {
config::force_immediate_recycle = false;
};
InstanceInfoPB instance_info;
ASSERT_NO_FATAL_FAILURE(get_instance(meta_service.get(), cloud_unique_id, instance_info));
auto recycler = get_instance_recycler(meta_service.get(), instance_info);
ASSERT_EQ(recycler->recycle_operation_logs(), 0);
ASSERT_EQ(recycler->recycle_rowsets(), 0);
}
ASSERT_NO_FATAL_FAILURE({
// The input rowsets are recycled.
check_no_specified_rowset_meta(txn_kv.get(), instance_id, tablet_id, input_rowset_ids);
check_rowset_meta(txn_kv.get(), instance_id, tablet_id, start_version, last_version);
check_partition_resources(txn_kv.get(), instance_id, table_id, index_id, partition_id);
check_tablet_meta(txn_kv.get(), instance_id, tablet_id);
});
}
// A test that simulates a schema change operation
TEST(RecycleVersionedKeysTest, RecycleRowset_SchemaChange) {
auto meta_service = get_meta_service();
auto txn_kv = meta_service->txn_kv();
std::string instance_id = "recycle_schema_change_test_instance";
std::string cloud_unique_id = fmt::format("1:{}:0", instance_id);
ASSERT_NO_FATAL_FAILURE(create_and_refresh_instance(meta_service.get(), instance_id));
int64_t db_id = 1, table_id = 2, index_id = 3, partition_id = 4, tablet_id = 5;
int64_t new_tablet_id = 6;
{
// create partition/index/tablet
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_index(meta_service.get(), cloud_unique_id, db_id,
table_id, index_id));
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_partition(
meta_service.get(), cloud_unique_id, db_id, table_id, partition_id, index_id));
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), cloud_unique_id, db_id, table_id,
index_id, partition_id, tablet_id));
}
size_t num_input_rowsets = 4;
std::vector<int64_t> input_versions;
{
// insert some rowsets
for (size_t i = 0; i < num_input_rowsets; ++i) {
ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), cloud_unique_id, db_id,
fmt::format("label_{}", i), table_id,
partition_id, tablet_id));
input_versions.push_back(i + 2); // versions start from 2
}
}
{
// Create new tablet and insert some rowsets to both new and old tablet.
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), cloud_unique_id, db_id, table_id,
index_id, partition_id, new_tablet_id,
TabletStatePB::PB_NOTREADY));
for (size_t i = 0; i < 2; ++i) {
ASSERT_NO_FATAL_FAILURE(insert_rowsets(meta_service.get(), cloud_unique_id, db_id,
fmt::format("new_label_{}", i), table_id,
partition_id, {new_tablet_id, tablet_id}));
}
}
std::vector<doris::RowsetMetaCloudPB> output_rowsets;
{
// simulate a schema change operation and recycle the input rowsets
std::string job_id = fmt::format("schema_change_{}_{}", tablet_id, new_tablet_id);
int64_t alter_version = input_versions.back();
ASSERT_NO_FATAL_FAILURE(start_schema_change_job(
meta_service.get(), cloud_unique_id, table_id, index_id, partition_id, tablet_id,
new_tablet_id, job_id, "test_case", alter_version));
// Create output rowsets for new tablet
for (auto version : input_versions) {
int64_t txn_id = 100000 + version; // Use different txn_id for schema change output
auto output_rowset = create_rowset(txn_id, new_tablet_id, partition_id, version, 100);
output_rowsets.push_back(output_rowset);
ASSERT_NO_FATAL_FAILURE(
prepare_rowset(meta_service.get(), cloud_unique_id, output_rowset));
ASSERT_NO_FATAL_FAILURE(
commit_rowset(meta_service.get(), cloud_unique_id, output_rowset));
}
ASSERT_NO_FATAL_FAILURE(finish_schema_change_job(meta_service.get(), cloud_unique_id,
tablet_id, new_tablet_id, job_id,
"test_case", output_rowsets));
}
{
// recycle the rowsets
config::force_immediate_recycle = true;
DORIS_CLOUD_DEFER {
config::force_immediate_recycle = false;
};
InstanceInfoPB instance_info;
ASSERT_NO_FATAL_FAILURE(get_instance(meta_service.get(), cloud_unique_id, instance_info));
auto recycler = get_instance_recycler(meta_service.get(), instance_info);
ASSERT_EQ(recycler->recycle_operation_logs(), 0);
ASSERT_EQ(recycler->recycle_rowsets(), 0);
}
ASSERT_NO_FATAL_FAILURE({
int64_t start_version = input_versions.front();
int64_t end_version = input_versions.back() + 2;
check_rowset_meta(txn_kv.get(), instance_id, tablet_id, start_version, end_version);
check_rowset_meta(txn_kv.get(), instance_id, new_tablet_id, start_version, end_version);
check_tablet_meta(txn_kv.get(), instance_id, tablet_id);
check_tablet_meta(txn_kv.get(), instance_id, new_tablet_id);
check_partition_resources(txn_kv.get(), instance_id, table_id, index_id, partition_id);
});
{
std::vector<RowsetMetaCloudPB> rowsets;
ASSERT_NO_FATAL_FAILURE(get_rowsets(meta_service.get(), cloud_unique_id, new_tablet_id,
input_versions.front(), input_versions.back(),
rowsets));
ASSERT_EQ(rowsets.size(), output_rowsets.size());
for (size_t i = 0; i < rowsets.size(); ++i) {
ASSERT_EQ(rowsets[i].rowset_id_v2(), output_rowsets[i].rowset_id_v2());
}
}
}
// A test that simulates a schema change operation and recycles the input rowsets.
TEST(RecycleVersionedKeysTest, RecycleRowset_SchemaChangeOverwrite) {
auto meta_service = get_meta_service();
auto txn_kv = meta_service->txn_kv();
std::string instance_id = "recycle_schema_change_overwrite_test_instance";
std::string cloud_unique_id = fmt::format("1:{}:0", instance_id);
ASSERT_NO_FATAL_FAILURE(create_and_refresh_instance(meta_service.get(), instance_id));
int64_t db_id = 1, table_id = 2, index_id = 3, partition_id = 4, tablet_id = 5;
int64_t new_tablet_id = 6;
{
// create partition/index/tablet
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_index(meta_service.get(), cloud_unique_id, db_id,
table_id, index_id));
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_partition(
meta_service.get(), cloud_unique_id, db_id, table_id, partition_id, index_id));
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), cloud_unique_id, db_id, table_id,
index_id, partition_id, tablet_id));
}
int64_t start_version = 2, end_version = 1;
{
// insert some rowsets
for (size_t i = 0; i < 4; ++i) {
ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), cloud_unique_id, db_id,
fmt::format("label_{}", i), table_id,
partition_id, tablet_id));
end_version += 1;
}
}
{
// Create new tablet and insert some rowsets to both new and old tablet.
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), cloud_unique_id, db_id, table_id,
index_id, partition_id, new_tablet_id,
TabletStatePB::PB_NOTREADY));
for (size_t i = 0; i < 2; ++i) {
ASSERT_NO_FATAL_FAILURE(insert_rowsets(meta_service.get(), cloud_unique_id, db_id,
fmt::format("new_label_{}", i), table_id,
partition_id, {new_tablet_id, tablet_id}));
end_version += 1;
}
}
std::vector<doris::RowsetMetaCloudPB> output_rowsets;
{
// simulate a schema change operation and recycle the input rowsets
std::string job_id = fmt::format("schema_change_{}_{}", tablet_id, new_tablet_id);
int64_t alter_version = end_version;
ASSERT_NO_FATAL_FAILURE(start_schema_change_job(
meta_service.get(), cloud_unique_id, table_id, index_id, partition_id, tablet_id,
new_tablet_id, job_id, "test_case", alter_version));
// Create output rowsets for new tablet
for (int64_t version = start_version; version <= end_version; ++version) {
int64_t txn_id = 100000 + version; // Use different txn_id for schema change output
auto output_rowset = create_rowset(txn_id, new_tablet_id, partition_id, version, 100);
output_rowsets.push_back(output_rowset);
ASSERT_NO_FATAL_FAILURE(
prepare_rowset(meta_service.get(), cloud_unique_id, output_rowset));
ASSERT_NO_FATAL_FAILURE(
commit_rowset(meta_service.get(), cloud_unique_id, output_rowset));
}
ASSERT_NO_FATAL_FAILURE(finish_schema_change_job(meta_service.get(), cloud_unique_id,
tablet_id, new_tablet_id, job_id,
"test_case", output_rowsets));
}
{
// recycle the rowsets
config::force_immediate_recycle = true;
DORIS_CLOUD_DEFER {
config::force_immediate_recycle = false;
};
InstanceInfoPB instance_info;
ASSERT_NO_FATAL_FAILURE(get_instance(meta_service.get(), cloud_unique_id, instance_info));
auto recycler = get_instance_recycler(meta_service.get(), instance_info);
ASSERT_EQ(recycler->recycle_operation_logs(), 0);
ASSERT_EQ(recycler->recycle_rowsets(), 0);
}
ASSERT_NO_FATAL_FAILURE({
check_rowset_meta(txn_kv.get(), instance_id, tablet_id, start_version, end_version);
check_rowset_meta(txn_kv.get(), instance_id, new_tablet_id, start_version, end_version);
check_tablet_meta(txn_kv.get(), instance_id, tablet_id);
check_tablet_meta(txn_kv.get(), instance_id, new_tablet_id);
check_partition_resources(txn_kv.get(), instance_id, table_id, index_id, partition_id);
});
{
std::vector<RowsetMetaCloudPB> rowsets;
ASSERT_NO_FATAL_FAILURE(get_rowsets(meta_service.get(), cloud_unique_id, new_tablet_id,
start_version, end_version, rowsets));
ASSERT_EQ(rowsets.size(), output_rowsets.size());
for (size_t i = 0; i < rowsets.size(); ++i) {
ASSERT_EQ(rowsets[i].rowset_id_v2(), output_rowsets[i].rowset_id_v2());
}
}
}
// A test that simulates a schema change operation, a compaction and recycles the input rowsets.
TEST(RecycleVersionedKeysTest, RecycleRowset_SchemaChangeAndCompaction) {
auto meta_service = get_meta_service();
auto txn_kv = meta_service->txn_kv();
std::string instance_id = "recycle_schema_change_compaction_test_instance";
std::string cloud_unique_id = fmt::format("1:{}:0", instance_id);
ASSERT_NO_FATAL_FAILURE(create_and_refresh_instance(meta_service.get(), instance_id));
int64_t db_id = 1, table_id = 2, index_id = 3, partition_id = 4, tablet_id = 5;
int64_t new_tablet_id = 6;
{
// create partition/index/tablet
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_index(meta_service.get(), cloud_unique_id, db_id,
table_id, index_id));
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_partition(
meta_service.get(), cloud_unique_id, db_id, table_id, partition_id, index_id));
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), cloud_unique_id, db_id, table_id,
index_id, partition_id, tablet_id));
}
size_t num_input_rowsets = 5;
std::vector<int64_t> input_versions;
{
// insert some rowsets
for (size_t i = 0; i < num_input_rowsets; ++i) {
ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), cloud_unique_id, db_id,
fmt::format("label_{}", i), table_id,
partition_id, tablet_id));
input_versions.push_back(i + 2); // versions start from 2
}
}
std::vector<doris::RowsetMetaCloudPB> schema_change_output_rowsets;
{
// simulate a schema change operation and recycle the input rowsets
std::string job_id = fmt::format("schema_change_{}_{}", tablet_id, new_tablet_id);
int64_t alter_version = input_versions.back();
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), cloud_unique_id, db_id, table_id,
index_id, partition_id, new_tablet_id,
TabletStatePB::PB_NOTREADY));
ASSERT_NO_FATAL_FAILURE(start_schema_change_job(
meta_service.get(), cloud_unique_id, table_id, index_id, partition_id, tablet_id,
new_tablet_id, job_id, "test_case", alter_version));
// Create output rowsets for new tablet
for (auto version : input_versions) {
int64_t txn_id = 100000 + version; // Use different txn_id for schema change output
auto output_rowset = create_rowset(txn_id, new_tablet_id, partition_id, version, 100);
schema_change_output_rowsets.push_back(output_rowset);
ASSERT_NO_FATAL_FAILURE(
prepare_rowset(meta_service.get(), cloud_unique_id, output_rowset));
ASSERT_NO_FATAL_FAILURE(
commit_rowset(meta_service.get(), cloud_unique_id, output_rowset));
}
ASSERT_NO_FATAL_FAILURE(finish_schema_change_job(
meta_service.get(), cloud_unique_id, tablet_id, new_tablet_id, job_id, "test_case",
schema_change_output_rowsets));
}
int64_t compaction_start_version = input_versions[1]; // Start from version 3
int64_t compaction_end_version = input_versions[3]; // End at version 5
{
// simulate a compaction operation and recycle the input rowsets
ASSERT_NO_FATAL_FAILURE(compact_rowsets_cumulative(
meta_service.get(), cloud_unique_id, db_id, "compaction_label", table_id,
partition_id, new_tablet_id, compaction_start_version, compaction_end_version,
300)); // num_rows for output rowset
}
{
// recycle the rowsets
config::force_immediate_recycle = true;
DORIS_CLOUD_DEFER {
config::force_immediate_recycle = false;
};
InstanceInfoPB instance_info;
ASSERT_NO_FATAL_FAILURE(get_instance(meta_service.get(), cloud_unique_id, instance_info));
auto recycler = get_instance_recycler(meta_service.get(), instance_info);
ASSERT_EQ(recycler->recycle_operation_logs(), 0);
ASSERT_EQ(recycler->recycle_rowsets(), 0);
}
ASSERT_NO_FATAL_FAILURE({
int64_t start_version = input_versions.front();
int64_t end_version = input_versions.back();
check_rowset_meta(txn_kv.get(), instance_id, tablet_id, start_version, end_version);
check_rowset_meta(txn_kv.get(), instance_id, new_tablet_id, start_version, end_version);
check_tablet_meta(txn_kv.get(), instance_id, tablet_id);
check_tablet_meta(txn_kv.get(), instance_id, new_tablet_id);
check_partition_resources(txn_kv.get(), instance_id, table_id, index_id, partition_id);
});
}
TEST(RecycleVersionedKeysTest, RecycleTabletWithRowsetRefCount) {
auto meta_service = get_meta_service();
auto txn_kv = meta_service->txn_kv();
std::string instance_id = "recycle_tablet_with_rowset_ref_count_test_instance";
std::string cloud_unique_id = fmt::format("1:{}:0", instance_id);
ASSERT_NO_FATAL_FAILURE(create_and_refresh_instance(meta_service.get(), instance_id));
int64_t db_id = 1, table_id = 2, index_id = 3, partition_id = 4, tablet_id = 5;
{
// prepare index, partition, and tablets
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_index(meta_service.get(), cloud_unique_id, db_id,
table_id, index_id));
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_partition(
meta_service.get(), cloud_unique_id, db_id, table_id, partition_id, index_id));
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), cloud_unique_id, db_id, table_id,
index_id, partition_id, tablet_id));
}
size_t num_rowsets = 4;
std::string last_rowset_id;
std::shared_ptr<StorageVaultAccessor> accessor = std::make_shared<MockAccessor>();
{
// insert some rowsets
for (size_t i = 0; i < num_rowsets; ++i) {
ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), cloud_unique_id, db_id,
fmt::format("label_{}", i), table_id,
partition_id, tablet_id, &last_rowset_id));
accessor->put_file(rowset_path_prefix(tablet_id, last_rowset_id) + "0.dat",
"test data");
}
}
{
// Create a rowset reference to the last rowset
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
auto rowset_ref_count_key =
versioned::data_rowset_ref_count_key({instance_id, tablet_id, last_rowset_id});
txn->atomic_add(rowset_ref_count_key, 1);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// recycle the rowsets
config::force_immediate_recycle = true;
DORIS_CLOUD_DEFER {
config::force_immediate_recycle = false;
};
InstanceInfoPB instance_info;
ASSERT_NO_FATAL_FAILURE(get_instance(meta_service.get(), cloud_unique_id, instance_info));
auto recycler = get_instance_recycler(meta_service.get(), instance_info, accessor);
RecyclerMetricsContext ctx;
ASSERT_EQ(0, recycler->recycle_tablet(tablet_id, ctx));
{
// check rowset does not exist on s3
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_directory(tablet_path_prefix(tablet_id), &list_iter));
ASSERT_TRUE(list_iter->has_next()) << dump_range(txn_kv.get());
ASSERT_EQ(list_iter->next().value().path,
rowset_path_prefix(tablet_id, last_rowset_id) + "0.dat");
ASSERT_FALSE(list_iter->has_next());
}
{
// check the rowset ref count key exists
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string begin_key = versioned::data_rowset_ref_count_key({instance_id, tablet_id, ""});
std::string end_key =
versioned::data_rowset_ref_count_key({instance_id, tablet_id + 1, ""});
std::unique_ptr<RangeGetIterator> it;
ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
size_t total_ref_count_keys = 0;
while (it->has_next()) {
auto [k, v] = it->next();
int64_t ref_count = 0;
ASSERT_TRUE(txn->decode_atomic_int(v, &ref_count));
ASSERT_EQ(ref_count, 1);
++total_ref_count_keys;
}
ASSERT_EQ(total_ref_count_keys, 1);
}
}
// Clone tablet metadata from source tablet to target tablet, sharing the same rowset data
void clone_tablet(TxnKv* txn_kv, std::string_view source_instance_id,
std::string_view target_instance_id, int64_t tablet_id) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
// 1. Clone tablet meta
{
std::string source_key = versioned::meta_tablet_key({source_instance_id, tablet_id});
std::string target_key = versioned::meta_tablet_key({target_instance_id, tablet_id});
Versionstamp version;
std::string value;
ASSERT_EQ(versioned_get(txn.get(), source_key, &version, &value), TxnErrorCode::TXN_OK);
// Decode and modify tablet meta
TabletMetaCloudPB tablet_meta;
ASSERT_TRUE(tablet_meta.ParseFromString(value));
// Write to target tablet
ASSERT_TRUE(versioned::document_put(txn.get(), target_key, std::move(tablet_meta)));
}
// 2. Clone tablet load stats
{
std::string source_key = versioned::tablet_load_stats_key({source_instance_id, tablet_id});
std::string target_key = versioned::tablet_load_stats_key({target_instance_id, tablet_id});
Versionstamp version;
std::string value;
ASSERT_EQ(versioned_get(txn.get(), source_key, &version, &value), TxnErrorCode::TXN_OK);
// Decode and modify load stats (TabletStatsPB doesn't have set_tablet_id, just copy as is)
TabletStatsPB stats;
ASSERT_TRUE(stats.ParseFromString(value));
// Write to target tablet
ASSERT_TRUE(versioned::document_put(txn.get(), target_key, std::move(stats)));
}
// 3. Clone tablet compaction stats
{
std::string source_key =
versioned::tablet_compact_stats_key({source_instance_id, tablet_id});
std::string target_key =
versioned::tablet_compact_stats_key({target_instance_id, tablet_id});
Versionstamp version;
std::string value;
ASSERT_EQ(versioned_get(txn.get(), source_key, &version, &value), TxnErrorCode::TXN_OK);
// Decode stats and write to target tablet
TabletStatsPB stats;
ASSERT_TRUE(stats.ParseFromString(value));
ASSERT_TRUE(versioned::document_put(txn.get(), target_key, std::move(stats)));
}
// 4. Clone rowset metas and increment ref counts
{
// Get all rowset metas for source tablet
std::vector<std::pair<RowsetMetaCloudPB, Versionstamp>> rowset_metas;
MetaReader meta_reader(source_instance_id);
ASSERT_EQ(meta_reader.get_load_rowset_metas(txn.get(), tablet_id, &rowset_metas),
TxnErrorCode::TXN_OK);
for (auto&& [rowset_meta, version] : rowset_metas) {
// Create new rowset meta key for target tablet
std::string target_rowset_key = versioned::meta_rowset_load_key(
{target_instance_id, tablet_id, rowset_meta.end_version()});
// Modify rowset meta to point to target tablet
RowsetMetaCloudPB target_rowset_meta = rowset_meta;
target_rowset_meta.set_reference_instance_id(std::string(source_instance_id));
ASSERT_TRUE(versioned::document_put(txn.get(), target_rowset_key,
std::move(target_rowset_meta)));
// Increment ref count for shared rowset data
std::string ref_count_key = versioned::data_rowset_ref_count_key(
{source_instance_id, tablet_id, rowset_meta.rowset_id_v2()});
txn->atomic_add(ref_count_key, 1);
}
}
// 5. Handle compaction rowset metas similarly
{
std::vector<std::pair<RowsetMetaCloudPB, Versionstamp>> rowset_metas;
MetaReader meta_reader(source_instance_id);
ASSERT_EQ(meta_reader.get_compact_rowset_metas(txn.get(), tablet_id, &rowset_metas),
TxnErrorCode::TXN_OK);
for (auto&& [rowset_meta, version] : rowset_metas) {
// Create new rowset meta key for target tablet
std::string target_rowset_key = versioned::meta_rowset_load_key(
{target_instance_id, tablet_id, rowset_meta.end_version()});
// Modify rowset meta to point to target tablet
RowsetMetaCloudPB target_rowset_meta = rowset_meta;
target_rowset_meta.set_reference_instance_id(std::string(source_instance_id));
ASSERT_TRUE(versioned::document_put(txn.get(), target_rowset_key,
std::move(target_rowset_meta)));
// Increment ref count for shared rowset data
std::string ref_count_key = versioned::data_rowset_ref_count_key(
{source_instance_id, tablet_id, rowset_meta.rowset_id_v2()});
txn->atomic_add(ref_count_key, 1);
}
}
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
void recycle_tablet_with_rowset_ref_count_concurrent() {
auto meta_service = get_meta_service();
auto txn_kv = meta_service->txn_kv();
std::string instance_id = "recycle_tablet_with_rowset_ref_count_concurrent_test_instance";
std::string cloud_unique_id = fmt::format("1:{}:0", instance_id);
ASSERT_NO_FATAL_FAILURE(create_and_refresh_instance(meta_service.get(), instance_id));
int64_t db_id = 1, table_id = 2, index_id = 3, partition_id = 4, tablet_id = 5;
std::vector<std::string> clone_instance_ids;
size_t num_cloned_instances = 1;
for (size_t i = 0; i < num_cloned_instances; ++i) {
clone_instance_ids.push_back(instance_id + fmt::format("_clone_{}", i));
ASSERT_NO_FATAL_FAILURE(
create_and_refresh_instance(meta_service.get(), clone_instance_ids.back()));
}
{
// prepare index, partition, and source tablet
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_index(meta_service.get(), cloud_unique_id, db_id,
table_id, index_id));
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_partition(
meta_service.get(), cloud_unique_id, db_id, table_id, partition_id, index_id));
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), cloud_unique_id, db_id, table_id,
index_id, partition_id, tablet_id));
}
std::shared_ptr<StorageVaultAccessor> accessor = std::make_shared<MockAccessor>();
size_t num_rowsets = 4;
std::vector<std::string> rowset_ids;
{
// insert some rowsets to source tablet
for (size_t i = 0; i < num_rowsets; ++i) {
std::string rowset_id;
ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), cloud_unique_id, db_id,
fmt::format("label_{}", i), table_id,
partition_id, tablet_id, &rowset_id));
rowset_ids.push_back(rowset_id);
accessor->put_file(rowset_path_prefix(tablet_id, rowset_id) + "0.dat", "mock_data");
}
}
{
// Clone tablets sharing the same rowset data
for (auto& clone_instance_id : clone_instance_ids) {
ASSERT_NO_FATAL_FAILURE(
clone_tablet(txn_kv.get(), instance_id, clone_instance_id, tablet_id));
}
}
// Verify initial ref counts
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
for (const std::string& rowset_id : rowset_ids) {
std::string source_ref_key =
versioned::data_rowset_ref_count_key({instance_id, tablet_id, rowset_id});
std::string source_ref_value;
auto rc = txn->get(source_ref_key, &source_ref_value);
ASSERT_EQ(rc, TxnErrorCode::TXN_OK);
int64_t ref_count = 0;
ASSERT_TRUE(txn->decode_atomic_int(source_ref_value, &ref_count));
ASSERT_EQ(ref_count, 1 + clone_instance_ids.size());
}
}
// Prepare for concurrent recycling
config::force_immediate_recycle = true;
DORIS_CLOUD_DEFER {
config::force_immediate_recycle = false;
};
clone_instance_ids.push_back(instance_id); // Include source instance for recycling
std::vector<InstanceInfoPB> instance_infos;
for (const auto& clone_instance_id : clone_instance_ids) {
InstanceInfoPB instance_info;
ASSERT_NO_FATAL_FAILURE(get_instance(
meta_service.get(), fmt::format("1:{}:0", clone_instance_id), instance_info));
instance_infos.push_back(instance_info);
}
// Launch multiple recyclers concurrently
num_cloned_instances = clone_instance_ids.size();
std::vector<std::thread> recycler_threads;
std::vector<int> recycler_results(num_cloned_instances, -1);
std::atomic<int> completed_recyclers {0};
for (size_t i = 0; i < num_cloned_instances; ++i) {
recycler_threads.emplace_back([&, i]() {
InstanceInfoPB instance_info = instance_infos[i];
auto recycler = get_instance_recycler(meta_service.get(), instance_info, accessor);
RecyclerMetricsContext ctx;
recycler_results[i] = recycler->recycle_tablet(tablet_id, ctx);
completed_recyclers.fetch_add(1);
});
}
// Wait for all recyclers to complete
for (auto& thread : recycler_threads) {
thread.join();
}
ASSERT_EQ(completed_recyclers.load(), num_cloned_instances) << "All recyclers should complete";
bool any_failure = false;
for (int result : recycler_results) {
if (result != 0) {
any_failure = true;
break;
}
}
ASSERT_FALSE(any_failure) << "All recyclers should succeed";
{
// Verify all rowset ref count keys are cleaned up
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
// Check source tablet
std::string begin_key = versioned::data_rowset_ref_count_key({instance_id, tablet_id, ""});
std::string end_key =
versioned::data_rowset_ref_count_key({instance_id, tablet_id + 1, ""});
std::unique_ptr<RangeGetIterator> it;
ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
size_t source_ref_count_keys = 0;
while (it->has_next()) {
it->next(); // Consume the entry
++source_ref_count_keys;
}
EXPECT_EQ(source_ref_count_keys, 0) << "Source tablet ref count keys should be cleaned up. "
<< dump_range(txn_kv.get(), begin_key, end_key);
}
{
// Verify all rowset data objects are cleaned up from accessor
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_directory(tablet_path_prefix(tablet_id), &list_iter));
EXPECT_FALSE(list_iter->has_next()) << "Source tablet data should be cleaned up";
std::unique_ptr<ListIterator> clone_iter;
ASSERT_EQ(0, accessor->list_directory(tablet_path_prefix(tablet_id), &clone_iter));
EXPECT_FALSE(clone_iter->has_next())
<< "Clone tablet " << tablet_id << " data should be cleaned up";
}
}
TEST(RecycleVersionedKeysTest, RecycleTabletWithRowsetRefCountConcurrent) {
for (size_t times = 0; times < 1; ++times) {
ASSERT_NO_FATAL_FAILURE(recycle_tablet_with_rowset_ref_count_concurrent());
}
}
// A test that simulates a drop index operation.
TEST(RecycleVersionedKeysTest, RecycleIndex) {
auto meta_service = get_meta_service();
auto txn_kv = meta_service->txn_kv();
std::string instance_id = "recycle_index_test_instance";
std::string cloud_unique_id = fmt::format("1:{}:0", instance_id);
ASSERT_NO_FATAL_FAILURE(create_and_refresh_instance(meta_service.get(), instance_id));
int64_t db_id = 1, table_id = 2, index_id = 3, partition_id = 4, tablet_id = 5;
{
// prepare index, partition, and tablets
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_index(meta_service.get(), cloud_unique_id, db_id,
table_id, index_id));
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_partition(
meta_service.get(), cloud_unique_id, db_id, table_id, partition_id, index_id));
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), cloud_unique_id, db_id, table_id,
index_id, partition_id, tablet_id));
}
size_t num_rowsets = 5;
{
// insert some rowsets
for (size_t i = 0; i < num_rowsets; ++i) {
ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), cloud_unique_id, db_id,
fmt::format("label_{}", i), table_id,
partition_id, tablet_id));
}
}
{
// drop index
ASSERT_NO_FATAL_FAILURE(
drop_index(meta_service.get(), cloud_unique_id, db_id, table_id, index_id));
}
{
// run recycler to recycle the dropped index
config::force_immediate_recycle = true;
DORIS_CLOUD_DEFER {
config::force_immediate_recycle = false;
};
InstanceInfoPB instance_info;
ASSERT_NO_FATAL_FAILURE(get_instance(meta_service.get(), cloud_unique_id, instance_info));
auto recycler = get_instance_recycler(meta_service.get(), instance_info);
ASSERT_EQ(recycler->recycle_operation_logs(), 0);
ASSERT_EQ(recycler->recycle_indexes(), 0);
}
{
// check the rowsets and tablets are recycled, but partition meta still exists
std::string meta_key = versioned::meta_key_prefix(instance_id);
std::string meta_key_end = versioned::meta_key_prefix(instance_id + '\x00');
EXPECT_EQ(count_range(txn_kv.get(), meta_key, meta_key_end), 1)
<< dump_range(txn_kv.get(), meta_key, meta_key_end);
std::string data_key = versioned::data_key_prefix(instance_id);
std::string data_key_end = versioned::data_key_prefix(instance_id + '\x00');
EXPECT_EQ(count_range(txn_kv.get(), data_key, data_key_end), 0)
<< dump_range(txn_kv.get(), data_key, data_key_end);
std::string index_key = versioned::index_key_prefix(instance_id);
std::string index_key_end = versioned::index_key_prefix(instance_id + '\x00');
EXPECT_EQ(count_range(txn_kv.get(), index_key, index_key_end), 2) // partition indexes
<< dump_range(txn_kv.get(), index_key, index_key_end);
std::string stats_key = versioned::stats_key_prefix(instance_id);
std::string stats_key_end = versioned::stats_key_prefix(instance_id + '\x00');
EXPECT_EQ(count_range(txn_kv.get(), stats_key, stats_key_end), 0)
<< dump_range(txn_kv.get(), stats_key, stats_key_end);
// But the versioned keys still exists
std::string version_key = versioned::version_key_prefix(instance_id);
std::string version_key_end = versioned::version_key_prefix(instance_id + '\x00');
EXPECT_EQ(count_range(txn_kv.get(), version_key, version_key_end), 2)
<< dump_range(txn_kv.get(), version_key, version_key_end);
}
}
// A test that simulates a drop partition operation.
TEST(RecycleVersionedKeysTest, RecyclePartition) {
auto meta_service = get_meta_service();
auto txn_kv = meta_service->txn_kv();
std::string instance_id = "recycle_partition_test_instance";
std::string cloud_unique_id = fmt::format("1:{}:0", instance_id);
ASSERT_NO_FATAL_FAILURE(create_and_refresh_instance(meta_service.get(), instance_id));
int64_t db_id = 1, table_id = 2, index_id = 3, partition_id = 4, tablet_id = 5;
{
// create partition/index/tablet
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_index(meta_service.get(), cloud_unique_id, db_id,
table_id, index_id));
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_partition(
meta_service.get(), cloud_unique_id, db_id, table_id, partition_id, index_id));
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), cloud_unique_id, db_id, table_id,
index_id, partition_id, tablet_id));
}
size_t num_rowsets = 3;
{
// insert some rowsets
for (size_t i = 0; i < num_rowsets; ++i) {
ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), cloud_unique_id, db_id,
fmt::format("label_{}", i), table_id,
partition_id, tablet_id));
}
}
{
// drop partition
ASSERT_NO_FATAL_FAILURE(drop_partition(meta_service.get(), cloud_unique_id, db_id, table_id,
partition_id, index_id));
}
{
// run recycler to recycle the dropped partition
config::force_immediate_recycle = true;
DORIS_CLOUD_DEFER {
config::force_immediate_recycle = false;
};
InstanceInfoPB instance_info;
ASSERT_NO_FATAL_FAILURE(get_instance(meta_service.get(), cloud_unique_id, instance_info));
auto recycler = get_instance_recycler(meta_service.get(), instance_info);
ASSERT_EQ(recycler->recycle_operation_logs(), 0);
ASSERT_EQ(recycler->recycle_partitions(), 0);
}
{
// check the rowsets and tablets are recycled
std::string meta_key = versioned::meta_key_prefix(instance_id);
std::string meta_key_end = versioned::meta_key_prefix(instance_id + '\x00');
EXPECT_EQ(count_range(txn_kv.get(), meta_key, meta_key_end), 2)
<< dump_range(txn_kv.get(), meta_key, meta_key_end);
std::string data_key = versioned::data_key_prefix(instance_id);
std::string data_key_end = versioned::data_key_prefix(instance_id + '\x00');
EXPECT_EQ(count_range(txn_kv.get(), data_key, data_key_end), 0)
<< dump_range(txn_kv.get(), data_key, data_key_end);
std::string index_key = versioned::index_key_prefix(instance_id);
std::string index_key_end = versioned::index_key_prefix(instance_id + '\x00');
EXPECT_EQ(count_range(txn_kv.get(), index_key, index_key_end), 2)
<< dump_range(txn_kv.get(), index_key, index_key_end);
std::string stats_key = versioned::stats_key_prefix(instance_id);
std::string stats_key_end = versioned::stats_key_prefix(instance_id + '\x00');
EXPECT_EQ(count_range(txn_kv.get(), stats_key, stats_key_end), 0)
<< dump_range(txn_kv.get(), stats_key, stats_key_end);
// Only table version exists
std::string version_key = versioned::version_key_prefix(instance_id);
std::string version_key_end = versioned::version_key_prefix(instance_id + '\x00');
EXPECT_EQ(count_range(txn_kv.get(), version_key, version_key_end), 1)
<< dump_range(txn_kv.get(), version_key, version_key_end);
}
}
// A test that simulates a drop table operation.
TEST(RecycleVersionedKeysTest, RecycleTable) {
auto meta_service = get_meta_service();
auto txn_kv = meta_service->txn_kv();
std::string instance_id = "recycle_table_test_instance";
std::string cloud_unique_id = fmt::format("1:{}:0", instance_id);
ASSERT_NO_FATAL_FAILURE(create_and_refresh_instance(meta_service.get(), instance_id));
int64_t db_id = 1, table_id = 2, index_id = 3, partition_id = 4, tablet_id = 5;
{
// prepare index, partition, and tablets
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_index(meta_service.get(), cloud_unique_id, db_id,
table_id, index_id));
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_partition(
meta_service.get(), cloud_unique_id, db_id, table_id, partition_id, index_id));
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), cloud_unique_id, db_id, table_id,
index_id, partition_id, tablet_id));
}
size_t num_rowsets = 4;
{
// insert some rowsets
for (size_t i = 0; i < num_rowsets; ++i) {
ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), cloud_unique_id, db_id,
fmt::format("label_{}", i), table_id,
partition_id, tablet_id));
}
}
config::force_immediate_recycle = true;
DORIS_CLOUD_DEFER {
config::force_immediate_recycle = false;
};
{
// No thing to recycle before drop table
InstanceInfoPB instance_info;
ASSERT_NO_FATAL_FAILURE(get_instance(meta_service.get(), cloud_unique_id, instance_info));
auto recycler = get_instance_recycler(meta_service.get(), instance_info);
ASSERT_EQ(recycler->recycle_operation_logs(), 0);
ASSERT_EQ(recycler->recycle_indexes(), 0);
ASSERT_EQ(recycler->recycle_versions(), 0);
}
ASSERT_NO_FATAL_FAILURE({
check_tablet_meta(txn_kv.get(), instance_id, tablet_id);
check_partition_resources(txn_kv.get(), instance_id, table_id, index_id, partition_id);
check_rowset_meta(txn_kv.get(), instance_id, tablet_id, 2, num_rowsets + 1);
});
{
// mark the table as deleted
ASSERT_NO_FATAL_FAILURE(
drop_index(meta_service.get(), cloud_unique_id, db_id, table_id, index_id));
}
{
// Recycle the dropped tables
InstanceInfoPB instance_info;
ASSERT_NO_FATAL_FAILURE(get_instance(meta_service.get(), cloud_unique_id, instance_info));
auto recycler = get_instance_recycler(meta_service.get(), instance_info);
ASSERT_EQ(recycler->recycle_operation_logs(), 0);
ASSERT_EQ(recycler->recycle_indexes(), 0);
ASSERT_EQ(recycler->recycle_versions(), 0);
ASSERT_EQ(recycler->recycle_orphan_partitions(), 0);
}
{
// Assert all datas of the instance are recycled.
std::string meta_key = versioned::meta_key_prefix(instance_id);
std::string meta_key_end = versioned::meta_key_prefix(instance_id + '\x00');
EXPECT_EQ(count_range(txn_kv.get(), meta_key, meta_key_end), 0)
<< dump_range(txn_kv.get(), meta_key, meta_key_end);
std::string data_key = versioned::data_key_prefix(instance_id);
std::string data_key_end = versioned::data_key_prefix(instance_id + '\x00');
EXPECT_EQ(count_range(txn_kv.get(), data_key, data_key_end), 0)
<< dump_range(txn_kv.get(), data_key, data_key_end);
std::string index_key = versioned::index_key_prefix(instance_id);
std::string index_key_end = versioned::index_key_prefix(instance_id + '\x00');
EXPECT_EQ(count_range(txn_kv.get(), index_key, index_key_end), 0)
<< dump_range(txn_kv.get(), index_key, index_key_end);
std::string stats_key = versioned::stats_key_prefix(instance_id);
std::string stats_key_end = versioned::stats_key_prefix(instance_id + '\x00');
EXPECT_EQ(count_range(txn_kv.get(), stats_key, stats_key_end), 0)
<< dump_range(txn_kv.get(), stats_key, stats_key_end);
std::string version_key = versioned::version_key_prefix(instance_id);
std::string version_key_end = versioned::version_key_prefix(instance_id + '\x00');
EXPECT_EQ(count_range(txn_kv.get(), version_key, version_key_end), 0)
<< dump_range(txn_kv.get(), version_key, version_key_end);
std::string log_key = versioned::log_key_prefix(instance_id);
std::string log_key_end = versioned::log_key_prefix(instance_id + '\x00');
EXPECT_EQ(count_range(txn_kv.get(), log_key, log_key_end), 0)
<< dump_range(txn_kv.get(), log_key, log_key_end);
}
}
// A test that simulates a recycle operation on a deleted instance.
TEST(RecycleVersionedKeysTest, RecycleDeletedInstance) {
auto meta_service = get_meta_service();
auto txn_kv = meta_service->txn_kv();
std::string instance_id = "recycle_deleted_instance_test_instance";
std::string cloud_unique_id = fmt::format("1:{}:0", instance_id);
ASSERT_NO_FATAL_FAILURE(create_and_refresh_instance(meta_service.get(), instance_id));
int64_t db_id = 1, table_id = 2, index_id = 3, partition_id = 4, tablet_id = 5;
{
// prepare index, partition, and tablets
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_index(meta_service.get(), cloud_unique_id, db_id,
table_id, index_id));
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_partition(
meta_service.get(), cloud_unique_id, db_id, table_id, partition_id, index_id));
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), cloud_unique_id, db_id, table_id,
index_id, partition_id, tablet_id));
}
InstanceInfoPB instance_info;
get_instance(meta_service.get(), cloud_unique_id, instance_info);
InstanceRecycler recycler(txn_kv, instance_info, thread_group,
std::make_shared<TxnLazyCommitter>(txn_kv));
std::shared_ptr<StorageVaultAccessor> accessor = nullptr;
{
ASSERT_EQ(recycler.init(), 0);
ASSERT_EQ(recycler.accessor_map_.size(), 1);
accessor = recycler.accessor_map_.begin()->second;
ASSERT_TRUE(accessor != nullptr);
}
size_t num_rowsets = 4;
{
// insert some rowsets
for (size_t i = 0; i < num_rowsets; ++i) {
ASSERT_NO_FATAL_FAILURE(insert_rowset(
meta_service.get(), cloud_unique_id, db_id, fmt::format("label_{}", i),
table_id, partition_id, tablet_id, nullptr, accessor.get()));
}
}
std::vector<doris::RowsetMetaCloudPB> rowsets;
{
get_rowsets(meta_service.get(), cloud_unique_id, tablet_id, 0, 4, rowsets);
ASSERT_EQ(rowsets.size(), 4);
for (int i = 1; i < rowsets.size(); ++i) {
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_directory(
rowset_path_prefix(tablet_id, rowsets[i].rowset_id_v2()),
&list_iter));
ASSERT_EQ(list_iter->has_next(), true);
}
}
{
// mark the instance as deleted
ASSERT_NO_FATAL_FAILURE(drop_instance(meta_service.get(), instance_id));
}
{
// Recycle deleted instance
ASSERT_EQ(recycler.recycle_deleted_instance(), 0);
}
{
// Assert all datas of the instance are recycled.
std::string meta_key = versioned::meta_key_prefix(instance_id);
std::string meta_key_end = versioned::meta_key_prefix(instance_id + '\x00');
ASSERT_EQ(count_range(txn_kv.get(), meta_key, meta_key_end), 0) << dump_range(txn_kv.get());
std::string data_key = versioned::data_key_prefix(instance_id);
std::string data_key_end = versioned::data_key_prefix(instance_id + '\x00');
ASSERT_EQ(count_range(txn_kv.get(), data_key, data_key_end), 0) << dump_range(txn_kv.get());
std::string index_key = versioned::index_key_prefix(instance_id);
std::string index_key_end = versioned::index_key_prefix(instance_id + '\x00');
ASSERT_EQ(count_range(txn_kv.get(), index_key, index_key_end), 0)
<< dump_range(txn_kv.get());
std::string stats_key = versioned::stats_key_prefix(instance_id);
std::string stats_key_end = versioned::stats_key_prefix(instance_id + '\x00');
ASSERT_EQ(count_range(txn_kv.get(), stats_key, stats_key_end), 0)
<< dump_range(txn_kv.get());
std::string version_key = versioned::version_key_prefix(instance_id);
std::string version_key_end = versioned::version_key_prefix(instance_id + '\x00');
ASSERT_EQ(count_range(txn_kv.get(), version_key, version_key_end), 0)
<< dump_range(txn_kv.get());
std::string log_key = versioned::log_key_prefix(instance_id);
std::string log_key_end = versioned::log_key_prefix(instance_id + '\x00');
ASSERT_EQ(count_range(txn_kv.get(), log_key, log_key_end), 0) << dump_range(txn_kv.get());
for (int i = 1; i < rowsets.size(); ++i) {
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_directory(
rowset_path_prefix(tablet_id, rowsets[i].rowset_id_v2()),
&list_iter));
ASSERT_EQ(list_iter->has_next(), false);
}
}
}
// ============================================================================
// Batch Delete Tests for recycle_versioned_tablet
// ============================================================================
// Test: ref_count==1 rowsets enter batch delete plan and recycle_rowset_key is cleaned up
TEST(RecycleVersionedKeysTest, BatchDeleteRefCountOne) {
auto meta_service = get_meta_service();
auto txn_kv = meta_service->txn_kv();
std::string instance_id = "batch_delete_ref_count_one_test_instance";
std::string cloud_unique_id = fmt::format("1:{}:0", instance_id);
ASSERT_NO_FATAL_FAILURE(create_and_refresh_instance(meta_service.get(), instance_id));
int64_t db_id = 1, table_id = 2, index_id = 3, partition_id = 4, tablet_id = 5;
{
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_index(meta_service.get(), cloud_unique_id, db_id,
table_id, index_id));
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_partition(
meta_service.get(), cloud_unique_id, db_id, table_id, partition_id, index_id));
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), cloud_unique_id, db_id, table_id,
index_id, partition_id, tablet_id));
}
size_t num_rowsets = 5;
std::vector<std::string> rowset_ids;
std::shared_ptr<MockAccessor> accessor = std::make_shared<MockAccessor>();
{
for (size_t i = 0; i < num_rowsets; ++i) {
std::string rowset_id;
ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), cloud_unique_id, db_id,
fmt::format("label_{}", i), table_id,
partition_id, tablet_id, &rowset_id));
rowset_ids.push_back(rowset_id);
// Create mock files for each rowset
accessor->put_file(segment_path(tablet_id, rowset_id, 0), "segment_data");
}
}
// All rowsets have ref_count==1 (default), should enter batch delete
config::force_immediate_recycle = true;
DORIS_CLOUD_DEFER {
config::force_immediate_recycle = false;
};
InstanceInfoPB instance_info;
ASSERT_NO_FATAL_FAILURE(get_instance(meta_service.get(), cloud_unique_id, instance_info));
auto recycler = get_instance_recycler(meta_service.get(), instance_info, accessor);
RecyclerMetricsContext ctx;
ASSERT_EQ(0, recycler->recycle_tablet(tablet_id, ctx));
{
// Verify all rowset data is deleted from storage
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_directory(tablet_path_prefix(tablet_id), &list_iter));
EXPECT_FALSE(list_iter->has_next()) << "All rowset data should be deleted";
}
{
// Verify all recycle_rowset keys are cleaned up
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string begin_key = recycle_rowset_key({instance_id, tablet_id, ""});
std::string end_key = recycle_rowset_key({instance_id, tablet_id + 1, ""});
std::unique_ptr<RangeGetIterator> it;
ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
size_t count = 0;
while (it->has_next()) {
it->next();
++count;
}
EXPECT_EQ(count, 0) << "All recycle_rowset keys should be cleaned up";
}
{
// Verify all ref_count keys are cleaned up
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string begin_key = versioned::data_rowset_ref_count_key({instance_id, tablet_id, ""});
std::string end_key =
versioned::data_rowset_ref_count_key({instance_id, tablet_id + 1, ""});
std::unique_ptr<RangeGetIterator> it;
ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
size_t count = 0;
while (it->has_next()) {
it->next();
++count;
}
EXPECT_EQ(count, 0) << "All ref_count keys should be cleaned up";
}
}
// Test: ref_count>1 rowsets only decrement count, do not enter batch delete plan
TEST(RecycleVersionedKeysTest, BatchDeleteRefCountGreaterThanOne) {
auto meta_service = get_meta_service();
auto txn_kv = meta_service->txn_kv();
std::string instance_id = "batch_delete_ref_count_gt_one_test_instance";
std::string cloud_unique_id = fmt::format("1:{}:0", instance_id);
ASSERT_NO_FATAL_FAILURE(create_and_refresh_instance(meta_service.get(), instance_id));
int64_t db_id = 1, table_id = 2, index_id = 3, partition_id = 4, tablet_id = 5;
{
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_index(meta_service.get(), cloud_unique_id, db_id,
table_id, index_id));
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_partition(
meta_service.get(), cloud_unique_id, db_id, table_id, partition_id, index_id));
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), cloud_unique_id, db_id, table_id,
index_id, partition_id, tablet_id));
}
size_t num_rowsets = 3;
std::vector<std::string> rowset_ids;
std::shared_ptr<MockAccessor> accessor = std::make_shared<MockAccessor>();
{
for (size_t i = 0; i < num_rowsets; ++i) {
std::string rowset_id;
ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), cloud_unique_id, db_id,
fmt::format("label_{}", i), table_id,
partition_id, tablet_id, &rowset_id));
rowset_ids.push_back(rowset_id);
accessor->put_file(segment_path(tablet_id, rowset_id, 0), "segment_data");
}
}
{
// Set ref_count > 1 for all rowsets (simulate shared rowsets)
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
for (const auto& rowset_id : rowset_ids) {
auto ref_count_key =
versioned::data_rowset_ref_count_key({instance_id, tablet_id, rowset_id});
txn->atomic_add(ref_count_key, 1); // ref_count becomes 2
}
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
config::force_immediate_recycle = true;
DORIS_CLOUD_DEFER {
config::force_immediate_recycle = false;
};
InstanceInfoPB instance_info;
ASSERT_NO_FATAL_FAILURE(get_instance(meta_service.get(), cloud_unique_id, instance_info));
auto recycler = get_instance_recycler(meta_service.get(), instance_info, accessor);
RecyclerMetricsContext ctx;
ASSERT_EQ(0, recycler->recycle_tablet(tablet_id, ctx));
{
// Verify rowset data is NOT deleted (ref_count > 1)
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_directory(tablet_path_prefix(tablet_id), &list_iter));
size_t file_count = 0;
while (list_iter->has_next()) {
list_iter->next();
++file_count;
}
EXPECT_EQ(file_count, num_rowsets)
<< "Rowset data should NOT be deleted when ref_count > 1";
}
{
// Verify ref_count is decremented to 1
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
for (const auto& rowset_id : rowset_ids) {
auto ref_count_key =
versioned::data_rowset_ref_count_key({instance_id, tablet_id, rowset_id});
std::string value;
auto rc = txn->get(ref_count_key, &value);
ASSERT_EQ(rc, TxnErrorCode::TXN_OK);
int64_t ref_count = 0;
ASSERT_TRUE(txn->decode_atomic_int(value, &ref_count));
EXPECT_EQ(ref_count, 1) << "ref_count should be decremented to 1";
}
}
{
// Verify recycle_rowset keys are cleaned up (since ref_count > 1 path removes them)
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string begin_key = recycle_rowset_key({instance_id, tablet_id, ""});
std::string end_key = recycle_rowset_key({instance_id, tablet_id + 1, ""});
std::unique_ptr<RangeGetIterator> it;
ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
size_t count = 0;
while (it->has_next()) {
it->next();
++count;
}
EXPECT_EQ(count, 0) << "recycle_rowset keys should be removed for ref_count > 1 case";
}
}
// Test: Mixed ref_count scenario - some rowsets have ref_count==1, others have ref_count>1
TEST(RecycleVersionedKeysTest, BatchDeleteMixedRefCount) {
auto meta_service = get_meta_service();
auto txn_kv = meta_service->txn_kv();
std::string instance_id = "batch_delete_mixed_ref_count_test_instance";
std::string cloud_unique_id = fmt::format("1:{}:0", instance_id);
ASSERT_NO_FATAL_FAILURE(create_and_refresh_instance(meta_service.get(), instance_id));
int64_t db_id = 1, table_id = 2, index_id = 3, partition_id = 4, tablet_id = 5;
{
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_index(meta_service.get(), cloud_unique_id, db_id,
table_id, index_id));
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_partition(
meta_service.get(), cloud_unique_id, db_id, table_id, partition_id, index_id));
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), cloud_unique_id, db_id, table_id,
index_id, partition_id, tablet_id));
}
size_t num_rowsets = 4;
std::vector<std::string> rowset_ids;
std::shared_ptr<MockAccessor> accessor = std::make_shared<MockAccessor>();
{
for (size_t i = 0; i < num_rowsets; ++i) {
std::string rowset_id;
ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), cloud_unique_id, db_id,
fmt::format("label_{}", i), table_id,
partition_id, tablet_id, &rowset_id));
rowset_ids.push_back(rowset_id);
accessor->put_file(segment_path(tablet_id, rowset_id, 0), "segment_data");
}
}
// Set ref_count > 1 for first two rowsets only
std::vector<std::string> shared_rowset_ids = {rowset_ids[0], rowset_ids[1]};
std::vector<std::string> unique_rowset_ids = {rowset_ids[2], rowset_ids[3]};
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
for (const auto& rowset_id : shared_rowset_ids) {
auto ref_count_key =
versioned::data_rowset_ref_count_key({instance_id, tablet_id, rowset_id});
txn->atomic_add(ref_count_key, 1); // ref_count becomes 2
}
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
config::force_immediate_recycle = true;
DORIS_CLOUD_DEFER {
config::force_immediate_recycle = false;
};
InstanceInfoPB instance_info;
ASSERT_NO_FATAL_FAILURE(get_instance(meta_service.get(), cloud_unique_id, instance_info));
auto recycler = get_instance_recycler(meta_service.get(), instance_info, accessor);
RecyclerMetricsContext ctx;
ASSERT_EQ(0, recycler->recycle_tablet(tablet_id, ctx));
{
// Verify only shared rowsets' data remains
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_directory(tablet_path_prefix(tablet_id), &list_iter));
std::set<std::string> remaining_files;
while (list_iter->has_next()) {
auto file = list_iter->next();
remaining_files.insert(file->path);
}
EXPECT_EQ(remaining_files.size(), shared_rowset_ids.size())
<< "Only shared rowsets' data should remain";
for (const auto& rowset_id : shared_rowset_ids) {
std::string expected_path = segment_path(tablet_id, rowset_id, 0);
EXPECT_TRUE(remaining_files.count(expected_path) > 0)
<< "Shared rowset data should remain: " << expected_path;
}
}
{
// Verify shared rowsets' ref_count is decremented to 1
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
for (const auto& rowset_id : shared_rowset_ids) {
auto ref_count_key =
versioned::data_rowset_ref_count_key({instance_id, tablet_id, rowset_id});
std::string value;
auto rc = txn->get(ref_count_key, &value);
ASSERT_EQ(rc, TxnErrorCode::TXN_OK);
int64_t ref_count = 0;
ASSERT_TRUE(txn->decode_atomic_int(value, &ref_count));
EXPECT_EQ(ref_count, 1) << "Shared rowset ref_count should be 1";
}
// Verify unique rowsets' ref_count keys are deleted
for (const auto& rowset_id : unique_rowset_ids) {
auto ref_count_key =
versioned::data_rowset_ref_count_key({instance_id, tablet_id, rowset_id});
std::string value;
auto rc = txn->get(ref_count_key, &value);
EXPECT_EQ(rc, TxnErrorCode::TXN_KEY_NOT_FOUND)
<< "Unique rowset ref_count key should be deleted";
}
}
}
// Test: Batch delete with multiple vaults (resource_ids)
TEST(RecycleVersionedKeysTest, BatchDeleteMultipleVaults) {
auto meta_service = get_meta_service();
auto txn_kv = meta_service->txn_kv();
std::string instance_id = "batch_delete_multi_vault_test_instance";
std::string cloud_unique_id = fmt::format("1:{}:0", instance_id);
ASSERT_NO_FATAL_FAILURE(create_and_refresh_instance(meta_service.get(), instance_id));
int64_t db_id = 1, table_id = 2, index_id = 3, partition_id = 4, tablet_id = 5;
{
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_index(meta_service.get(), cloud_unique_id, db_id,
table_id, index_id));
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_partition(
meta_service.get(), cloud_unique_id, db_id, table_id, partition_id, index_id));
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), cloud_unique_id, db_id, table_id,
index_id, partition_id, tablet_id));
}
// Create two accessors for different vaults
std::string resource_id_1 = std::string(RESOURCE_ID);
std::string resource_id_2 = "mock_resource_id_2";
auto accessor_1 = std::make_shared<MockAccessor>();
auto accessor_2 = std::make_shared<MockAccessor>();
std::vector<std::string> rowset_ids;
{
// Insert rowsets - they will use RESOURCE_ID by default
for (size_t i = 0; i < 3; ++i) {
std::string rowset_id;
ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), cloud_unique_id, db_id,
fmt::format("label_{}", i), table_id,
partition_id, tablet_id, &rowset_id));
rowset_ids.push_back(rowset_id);
accessor_1->put_file(segment_path(tablet_id, rowset_id, 0), "segment_data");
}
}
config::force_immediate_recycle = true;
DORIS_CLOUD_DEFER {
config::force_immediate_recycle = false;
};
InstanceInfoPB instance_info;
ASSERT_NO_FATAL_FAILURE(get_instance(meta_service.get(), cloud_unique_id, instance_info));
auto txn_lazy_committer = std::make_shared<TxnLazyCommitter>(meta_service->txn_kv());
auto recycler = std::make_unique<InstanceRecycler>(meta_service->txn_kv(), instance_info,
thread_group, txn_lazy_committer);
// Add both accessors
recycler->TEST_add_accessor(resource_id_1, accessor_1);
recycler->TEST_add_accessor(resource_id_2, accessor_2);
RecyclerMetricsContext ctx;
ASSERT_EQ(0, recycler->recycle_tablet(tablet_id, ctx));
{
// Verify accessor_1's data is deleted
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor_1->list_directory(tablet_path_prefix(tablet_id), &list_iter));
EXPECT_FALSE(list_iter->has_next()) << "Vault 1 data should be deleted";
}
{
// Verify all metadata is cleaned up
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string begin_key = versioned::data_rowset_ref_count_key({instance_id, tablet_id, ""});
std::string end_key =
versioned::data_rowset_ref_count_key({instance_id, tablet_id + 1, ""});
std::unique_ptr<RangeGetIterator> it;
ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
size_t count = 0;
while (it->has_next()) {
it->next();
++count;
}
EXPECT_EQ(count, 0) << "All ref_count keys should be cleaned up";
}
}
// Test: Batch size exceeds max_batch_size, verify correct batching
TEST(RecycleVersionedKeysTest, BatchDeleteExceedsMaxBatchSize) {
auto meta_service = get_meta_service();
auto txn_kv = meta_service->txn_kv();
std::string instance_id = "batch_delete_exceeds_max_batch_size_test_instance";
std::string cloud_unique_id = fmt::format("1:{}:0", instance_id);
ASSERT_NO_FATAL_FAILURE(create_and_refresh_instance(meta_service.get(), instance_id));
int64_t db_id = 1, table_id = 2, index_id = 3, partition_id = 4, tablet_id = 5;
{
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_index(meta_service.get(), cloud_unique_id, db_id,
table_id, index_id));
ASSERT_NO_FATAL_FAILURE(prepare_and_commit_partition(
meta_service.get(), cloud_unique_id, db_id, table_id, partition_id, index_id));
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), cloud_unique_id, db_id, table_id,
index_id, partition_id, tablet_id));
}
// Create multiple rowsets to test batch delete
size_t num_rowsets = 10;
std::vector<std::string> rowset_ids;
std::shared_ptr<MockAccessor> accessor = std::make_shared<MockAccessor>();
{
for (size_t i = 0; i < num_rowsets; ++i) {
std::string rowset_id;
ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), cloud_unique_id, db_id,
fmt::format("label_{}", i), table_id,
partition_id, tablet_id, &rowset_id));
rowset_ids.push_back(rowset_id);
accessor->put_file(segment_path(tablet_id, rowset_id, 0), "segment_data");
}
}
config::force_immediate_recycle = true;
DORIS_CLOUD_DEFER {
config::force_immediate_recycle = false;
};
InstanceInfoPB instance_info;
ASSERT_NO_FATAL_FAILURE(get_instance(meta_service.get(), cloud_unique_id, instance_info));
auto recycler = get_instance_recycler(meta_service.get(), instance_info, accessor);
RecyclerMetricsContext ctx;
ASSERT_EQ(0, recycler->recycle_tablet(tablet_id, ctx));
{
// Verify all data is deleted
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_directory(tablet_path_prefix(tablet_id), &list_iter));
EXPECT_FALSE(list_iter->has_next()) << "All data should be deleted";
}
{
// Verify all metadata is cleaned up
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string begin_key = versioned::data_rowset_ref_count_key({instance_id, tablet_id, ""});
std::string end_key =
versioned::data_rowset_ref_count_key({instance_id, tablet_id + 1, ""});
std::unique_ptr<RangeGetIterator> it;
ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
size_t count = 0;
while (it->has_next()) {
it->next();
++count;
}
EXPECT_EQ(count, 0) << "All ref_count keys should be cleaned up";
}
}