| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <butil/strings/string_split.h> |
| #include <fmt/core.h> |
| #include <gen_cpp/cloud.pb.h> |
| #include <gen_cpp/olap_file.pb.h> |
| #include <gtest/gtest.h> |
| |
| #include <cstdint> |
| #include <memory> |
| #include <string> |
| |
| #include "common/config.h" |
| #include "common/util.h" |
| #include "cpp/sync_point.h" |
| #include "meta-service/meta_service.h" |
| #include "meta-store/blob_message.h" |
| #include "meta-store/document_message.h" |
| #include "meta-store/keys.h" |
| #include "meta-store/mem_txn_kv.h" |
| #include "meta-store/meta_reader.h" |
| #include "meta-store/txn_kv.h" |
| #include "meta-store/txn_kv_error.h" |
| #include "meta-store/versioned_value.h" |
| #include "mock_resource_manager.h" |
| #include "recycler/checker.h" |
| #include "recycler/recycler.h" |
| #include "recycler/util.h" |
| |
| using namespace doris::cloud; |
| |
| extern std::string instance_id; |
| extern int64_t current_time; |
| extern doris::cloud::RecyclerThreadPoolGroup thread_group; |
| |
| // Convert a string to a hex-escaped string. |
| // A non-displayed character is represented as \xHH where HH is the hexadecimal value of the character. |
| // A displayed character is represented as itself. |
| static std::string escape_hex(std::string_view data) { |
| std::string result; |
| for (char c : data) { |
| if (isprint(c)) { |
| result += c; |
| } else { |
| result += fmt::format("\\x{:02x}", static_cast<unsigned char>(c)); |
| } |
| } |
| return result; |
| } |
| |
| static size_t count_range(TxnKv* txn_kv, std::string_view begin = "", |
| std::string_view end = "\xFF") { |
| std::unique_ptr<Transaction> txn; |
| EXPECT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| if (!txn) { |
| return 0; // Failed to create transaction |
| } |
| |
| FullRangeGetOptions opts; |
| opts.txn = txn.get(); |
| auto iter = txn_kv->full_range_get(std::string(begin), std::string(end), std::move(opts)); |
| size_t total = 0; |
| for (auto&& kvp = iter->next(); kvp.has_value(); kvp = iter->next()) { |
| total += 1; |
| } |
| |
| EXPECT_TRUE(iter->is_valid()); // The iterator should still be valid after the next call. |
| return total; |
| } |
| |
| static bool is_empty_range(TxnKv* txn_kv, std::string_view begin = "", |
| std::string_view end = "\xFF") { |
| return count_range(txn_kv, begin, end) == 0; |
| } |
| |
| static std::string dump_range(TxnKv* txn_kv, std::string_view begin = "", |
| std::string_view end = "\xFF") { |
| std::unique_ptr<Transaction> txn; |
| if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) { |
| return "Failed to create dump range transaction"; |
| } |
| FullRangeGetOptions opts; |
| opts.txn = txn.get(); |
| auto iter = txn_kv->full_range_get(std::string(begin), std::string(end), std::move(opts)); |
| std::string buffer; |
| for (auto&& kv = iter->next(); kv.has_value(); kv = iter->next()) { |
| buffer += |
| fmt::format("Key: {}, Value: {}\n", escape_hex(kv->first), escape_hex(kv->second)); |
| } |
| EXPECT_TRUE(iter->is_valid()); // The iterator should still be valid after the next call. |
| return buffer; |
| } |
| |
| static void update_instance_info(TxnKv* txn_kv, const InstanceInfoPB& instance) { |
| std::string key = instance_key({instance_id}); |
| std::string value = instance.SerializeAsString(); |
| ASSERT_FALSE(value.empty()) << "Failed to serialize instance info"; |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK) << "Failed to create transaction"; |
| txn->put(key, value); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK) << "Failed to commit transaction"; |
| } |
| |
| static void remove_instance_info(TxnKv* txn_kv) { |
| std::string key = instance_key({instance_id}); |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK) << "Failed to create transaction"; |
| txn->remove(key); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK) << "Failed to commit transaction"; |
| } |
| |
| // It will get the latest versioned values. |
| TxnErrorCode read_operation_log(Transaction* txn, std::string_view log_key, |
| Versionstamp* log_version, OperationLogPB* operation_log) { |
| std::string begin_key = encode_versioned_key(log_key, Versionstamp::min()); |
| std::string end_key = encode_versioned_key(log_key, Versionstamp::max()); |
| auto iter = blob_get_range(txn, begin_key, end_key); |
| if (!iter->valid()) { |
| TxnErrorCode err = iter->error_code(); |
| if (err != TxnErrorCode::TXN_OK) { |
| return err; |
| } |
| return TxnErrorCode::TXN_KEY_NOT_FOUND; |
| } |
| for (; iter->valid(); iter->next()) { |
| std::string_view key = iter->key(); |
| if (!decode_versioned_key(&key, log_version)) { |
| return TxnErrorCode::TXN_INVALID_DATA; |
| } |
| if (!iter->parse_value(operation_log)) { |
| return TxnErrorCode::TXN_INVALID_DATA; |
| } |
| } |
| return iter->error_code(); |
| } |
| |
| TEST(RecycleOperationLogTest, RecycleOneOperationLog) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| txn_kv->update_commit_version(1000); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_ENABLED); |
| auto* obj_info = instance.add_obj_info(); |
| obj_info->set_id("recycle_empty"); |
| obj_info->set_ak(config::test_s3_ak); |
| obj_info->set_sk(config::test_s3_sk); |
| obj_info->set_endpoint(config::test_s3_endpoint); |
| obj_info->set_region(config::test_s3_region); |
| obj_info->set_bucket(config::test_s3_bucket); |
| obj_info->set_prefix("recycle_empty"); |
| |
| update_instance_info(txn_kv.get(), instance); |
| |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| |
| { |
| // Put a empty operation log |
| std::string log_key = versioned::log_key(instance_id); |
| Versionstamp versionstamp(123, 0); |
| OperationLogPB operation_log; |
| operation_log.set_min_timestamp(versionstamp.version()); |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| versioned::blob_put(txn.get(), log_key, versionstamp, operation_log); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Recycle the operation logs. |
| ASSERT_EQ(recycler.recycle_operation_logs(), 0); |
| |
| // Scan the operation logs to verify that the empty log is removed. |
| remove_instance_info(txn_kv.get()); |
| ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get()); |
| } |
| |
| TEST(RecycleOperationLogTest, RecycleCommitPartitionLog) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| txn_kv->update_commit_version(1000); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_ENABLED); |
| update_instance_info(txn_kv.get(), instance); |
| |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| |
| uint64_t db_id = 1; |
| uint64_t table_id = 2; |
| uint64_t index_id = 3; |
| uint64_t partition_id = 4; |
| { |
| // Update the table version |
| std::string ver_key = versioned::table_version_key({instance_id, table_id}); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| versioned_put(txn.get(), ver_key, ""); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| MetaReader meta_reader(instance_id, txn_kv.get()); |
| Versionstamp table_version; |
| TxnErrorCode err = meta_reader.get_table_version(table_id, &table_version); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| // Put a commit partition log |
| std::string log_key = versioned::log_key(instance_id); |
| Versionstamp versionstamp(123, 0); |
| OperationLogPB operation_log; |
| operation_log.set_min_timestamp(versionstamp.version()); |
| auto* commit_partition = operation_log.mutable_commit_partition(); |
| commit_partition->set_db_id(db_id); |
| commit_partition->set_table_id(table_id); |
| commit_partition->add_index_ids(index_id); |
| commit_partition->add_partition_ids(partition_id); |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| versioned::blob_put(txn.get(), log_key, operation_log); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Recycle the operation logs. |
| ASSERT_EQ(recycler.recycle_operation_logs(), 0); |
| |
| // The above table version key should be removed. |
| { |
| MetaReader meta_reader(instance_id, txn_kv.get()); |
| Versionstamp table_version; |
| TxnErrorCode err = meta_reader.get_table_version(table_id, &table_version); |
| ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND); |
| } |
| |
| { |
| // Put a new commit partition log |
| std::string log_key = versioned::log_key(instance_id); |
| Versionstamp versionstamp(123, 0); |
| OperationLogPB operation_log; |
| operation_log.set_min_timestamp(versionstamp.version()); |
| auto* commit_partition = operation_log.mutable_commit_partition(); |
| commit_partition->set_db_id(db_id); |
| commit_partition->set_table_id(table_id); |
| commit_partition->add_index_ids(index_id); |
| commit_partition->add_partition_ids(partition_id); |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| versioned::blob_put(txn.get(), log_key, operation_log); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Recycle the operation logs. |
| ASSERT_EQ(recycler.recycle_operation_logs(), 0); |
| |
| // Scan the operation logs to verify that the commit partition log is removed. |
| remove_instance_info(txn_kv.get()); |
| ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get()); |
| } |
| |
| TEST(RecycleOperationLogTest, RecycleDropPartitionLog) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| txn_kv->update_commit_version(1000); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_ENABLED); |
| update_instance_info(txn_kv.get(), instance); |
| |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| |
| uint64_t db_id = 1; |
| uint64_t table_id = 2; |
| uint64_t index_id = 3; |
| uint64_t partition_id = 4; |
| int64_t expiration = ::time(nullptr) + 3600; // 1 hour from now |
| |
| { |
| // Update the table version, it will be removed after the drop partition log is recycled |
| std::string ver_key = versioned::table_version_key({instance_id, table_id}); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| versioned_put(txn.get(), ver_key, ""); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| MetaReader meta_reader(instance_id, txn_kv.get()); |
| Versionstamp table_version; |
| TxnErrorCode err = meta_reader.get_table_version(table_id, &table_version); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| // Put a drop partition log |
| std::string log_key = versioned::log_key(instance_id); |
| Versionstamp versionstamp(123, 0); |
| OperationLogPB operation_log; |
| operation_log.set_min_timestamp(versionstamp.version()); |
| auto* drop_partition = operation_log.mutable_drop_partition(); |
| drop_partition->set_db_id(db_id); |
| drop_partition->set_table_id(table_id); |
| drop_partition->add_index_ids(index_id); |
| drop_partition->add_partition_ids(partition_id); |
| drop_partition->set_expired_at_s(expiration); |
| drop_partition->set_update_table_version( |
| true); // Update table version to ensure the table version is removed |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| versioned::blob_put(txn.get(), log_key, operation_log); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Recycle the operation logs. |
| ASSERT_EQ(recycler.recycle_operation_logs(), 0); |
| |
| // Verify that the recycle partition record is created |
| { |
| std::string recycle_key = recycle_partition_key({instance_id, partition_id}); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string value; |
| TxnErrorCode err = txn->get(recycle_key, &value); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| |
| RecyclePartitionPB recycle_partition_pb; |
| ASSERT_TRUE(recycle_partition_pb.ParseFromString(value)); |
| ASSERT_EQ(recycle_partition_pb.db_id(), db_id); |
| ASSERT_EQ(recycle_partition_pb.table_id(), table_id); |
| ASSERT_EQ(recycle_partition_pb.index_id_size(), 1); |
| ASSERT_EQ(recycle_partition_pb.index_id(0), index_id); |
| ASSERT_EQ(recycle_partition_pb.state(), RecyclePartitionPB::DROPPED); |
| ASSERT_EQ(recycle_partition_pb.expiration(), expiration); |
| } |
| |
| // The table version key should be removed because update_table_version is true |
| { |
| MetaReader meta_reader(instance_id, txn_kv.get()); |
| Versionstamp table_version; |
| TxnErrorCode err = meta_reader.get_table_version(table_id, &table_version); |
| ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND); |
| } |
| |
| { |
| // Put a new drop partition log without updating table version |
| std::string log_key = versioned::log_key(instance_id); |
| Versionstamp versionstamp(124, 0); |
| OperationLogPB operation_log; |
| operation_log.set_min_timestamp(versionstamp.version()); |
| auto* drop_partition = operation_log.mutable_drop_partition(); |
| drop_partition->set_db_id(db_id); |
| drop_partition->set_table_id(table_id); |
| drop_partition->add_index_ids(index_id); |
| drop_partition->add_partition_ids(partition_id + 1); // Different partition |
| drop_partition->set_expired_at_s(expiration); |
| drop_partition->set_update_table_version(false); // Don't update table version |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| versioned::blob_put(txn.get(), log_key, operation_log); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Recycle the operation logs. |
| ASSERT_EQ(recycler.recycle_operation_logs(), 0); |
| |
| // Verify that the second recycle partition record is created |
| { |
| std::string recycle_key = recycle_partition_key({instance_id, partition_id + 1}); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string value; |
| TxnErrorCode err = txn->get(recycle_key, &value); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| |
| RecyclePartitionPB recycle_partition_pb; |
| ASSERT_TRUE(recycle_partition_pb.ParseFromString(value)); |
| ASSERT_EQ(recycle_partition_pb.db_id(), db_id); |
| ASSERT_EQ(recycle_partition_pb.table_id(), table_id); |
| ASSERT_EQ(recycle_partition_pb.index_id_size(), 1); |
| ASSERT_EQ(recycle_partition_pb.index_id(0), index_id); |
| ASSERT_EQ(recycle_partition_pb.state(), RecyclePartitionPB::DROPPED); |
| ASSERT_EQ(recycle_partition_pb.expiration(), expiration); |
| } |
| |
| // Scan the operation logs to verify that the drop partition logs are removed. |
| remove_instance_info(txn_kv.get()); |
| |
| // Verify that the recycle partition records are created and operation logs are removed |
| { |
| // Check that the first recycle partition record exists |
| std::string recycle_key1 = recycle_partition_key({instance_id, partition_id}); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string value; |
| TxnErrorCode err = txn->get(recycle_key1, &value); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| |
| RecyclePartitionPB recycle_partition_pb; |
| ASSERT_TRUE(recycle_partition_pb.ParseFromString(value)); |
| ASSERT_EQ(recycle_partition_pb.db_id(), db_id); |
| ASSERT_EQ(recycle_partition_pb.table_id(), table_id); |
| ASSERT_EQ(recycle_partition_pb.index_id_size(), 1); |
| ASSERT_EQ(recycle_partition_pb.index_id(0), index_id); |
| ASSERT_EQ(recycle_partition_pb.state(), RecyclePartitionPB::DROPPED); |
| ASSERT_EQ(recycle_partition_pb.expiration(), expiration); |
| } |
| |
| { |
| // Check that the second recycle partition record exists |
| std::string recycle_key2 = recycle_partition_key({instance_id, partition_id + 1}); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string value; |
| TxnErrorCode err = txn->get(recycle_key2, &value); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| |
| RecyclePartitionPB recycle_partition_pb; |
| ASSERT_TRUE(recycle_partition_pb.ParseFromString(value)); |
| ASSERT_EQ(recycle_partition_pb.db_id(), db_id); |
| ASSERT_EQ(recycle_partition_pb.table_id(), table_id); |
| ASSERT_EQ(recycle_partition_pb.index_id_size(), 1); |
| ASSERT_EQ(recycle_partition_pb.index_id(0), index_id); |
| ASSERT_EQ(recycle_partition_pb.state(), RecyclePartitionPB::DROPPED); |
| ASSERT_EQ(recycle_partition_pb.expiration(), expiration); |
| } |
| |
| // Verify that operation logs are removed (only recycle partition records should remain) |
| ASSERT_EQ(count_range(txn_kv.get()), 2) << "Should only have 2 recycle partition records"; |
| } |
| |
| TEST(RecycleOperationLogTest, RecycleCommitIndexLog) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| txn_kv->update_commit_version(1000); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_ENABLED); |
| update_instance_info(txn_kv.get(), instance); |
| |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| |
| uint64_t db_id = 1; |
| uint64_t table_id = 2; |
| uint64_t index_id = 3; |
| { |
| // Update the table version |
| std::string ver_key = versioned::table_version_key({instance_id, table_id}); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| versioned_put(txn.get(), ver_key, ""); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| MetaReader meta_reader(instance_id, txn_kv.get()); |
| Versionstamp table_version; |
| TxnErrorCode err = meta_reader.get_table_version(table_id, &table_version); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| // Put a commit index log |
| std::string log_key = versioned::log_key(instance_id); |
| Versionstamp versionstamp(123, 0); |
| OperationLogPB operation_log; |
| operation_log.set_min_timestamp(versionstamp.version()); |
| auto* commit_index = operation_log.mutable_commit_index(); |
| commit_index->set_db_id(db_id); |
| commit_index->set_table_id(table_id); |
| commit_index->add_index_ids(index_id); |
| commit_index->set_update_table_version(true); |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| versioned::blob_put(txn.get(), log_key, operation_log); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Recycle the operation logs. |
| ASSERT_EQ(recycler.recycle_operation_logs(), 0); |
| |
| // The above table version key should be removed. |
| { |
| MetaReader meta_reader(instance_id, txn_kv.get()); |
| Versionstamp table_version; |
| TxnErrorCode err = meta_reader.get_table_version(table_id, &table_version); |
| ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND); |
| } |
| |
| { |
| // Put a new commit index log |
| std::string log_key = versioned::log_key(instance_id); |
| Versionstamp versionstamp(123, 0); |
| OperationLogPB operation_log; |
| operation_log.set_min_timestamp(versionstamp.version()); |
| auto* commit_index = operation_log.mutable_commit_index(); |
| commit_index->set_db_id(db_id); |
| commit_index->set_table_id(table_id); |
| commit_index->add_index_ids(index_id); |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| versioned::blob_put(txn.get(), log_key, operation_log); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Recycle the operation logs. |
| ASSERT_EQ(recycler.recycle_operation_logs(), 0); |
| |
| // Scan the operation logs to verify that the commit index log is removed. |
| remove_instance_info(txn_kv.get()); |
| ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get()); |
| } |
| |
| TEST(RecycleOperationLogTest, RecycleDropIndexLog) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| txn_kv->update_commit_version(1000); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_ENABLED); |
| update_instance_info(txn_kv.get(), instance); |
| |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| |
| uint64_t db_id = 1; |
| uint64_t table_id = 2; |
| uint64_t index_id = 3; |
| int64_t expiration = ::time(nullptr) + 3600; // 1 hour from now |
| |
| { |
| // Put a drop index log |
| std::string log_key = versioned::log_key(instance_id); |
| Versionstamp versionstamp(123, 0); |
| OperationLogPB operation_log; |
| operation_log.set_min_timestamp(versionstamp.version()); |
| auto* drop_index = operation_log.mutable_drop_index(); |
| drop_index->set_db_id(db_id); |
| drop_index->set_table_id(table_id); |
| drop_index->add_index_ids(index_id); |
| drop_index->set_expiration(expiration); |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| versioned::blob_put(txn.get(), log_key, operation_log); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Recycle the operation logs. |
| ASSERT_EQ(recycler.recycle_operation_logs(), 0); |
| |
| // Verify that the recycle index record is created |
| { |
| std::string recycle_key = recycle_index_key({instance_id, index_id}); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string value; |
| TxnErrorCode err = txn->get(recycle_key, &value); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| |
| RecycleIndexPB recycle_index_pb; |
| ASSERT_TRUE(recycle_index_pb.ParseFromString(value)); |
| ASSERT_EQ(recycle_index_pb.db_id(), db_id); |
| ASSERT_EQ(recycle_index_pb.table_id(), table_id); |
| ASSERT_EQ(recycle_index_pb.state(), RecycleIndexPB::DROPPED); |
| ASSERT_EQ(recycle_index_pb.expiration(), expiration); |
| } |
| |
| // Scan the operation logs to verify that the drop index logs are removed. |
| remove_instance_info(txn_kv.get()); |
| |
| // Verify that the recycle index records are created and operation logs are removed |
| { |
| // Check that the first recycle index record exists |
| std::string recycle_key1 = recycle_index_key({instance_id, index_id}); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string value; |
| TxnErrorCode err = txn->get(recycle_key1, &value); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| |
| RecycleIndexPB recycle_index_pb; |
| ASSERT_TRUE(recycle_index_pb.ParseFromString(value)); |
| ASSERT_EQ(recycle_index_pb.db_id(), db_id); |
| ASSERT_EQ(recycle_index_pb.table_id(), table_id); |
| ASSERT_EQ(recycle_index_pb.state(), RecycleIndexPB::DROPPED); |
| ASSERT_EQ(recycle_index_pb.expiration(), expiration); |
| } |
| |
| // Verify that operation logs are removed (only recycle index records should remain) |
| ASSERT_EQ(count_range(txn_kv.get()), 1) << "Should only have 2 recycle index records"; |
| } |
| |
| TEST(RecycleOperationLogTest, RecycleCommitTxnLog) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| txn_kv->update_commit_version(1000); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_ENABLED); |
| update_instance_info(txn_kv.get(), instance); |
| |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| |
| uint64_t db_id = 1; |
| uint64_t table_id = 2; |
| uint64_t partition_id = 4; |
| uint64_t tablet_id = 5; |
| uint64_t txn_id = 12345; |
| |
| // Create TxnInfo with VISIBLE status |
| { |
| TxnInfoPB txn_info; |
| txn_info.set_db_id(db_id); |
| txn_info.set_txn_id(txn_id); |
| txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE); |
| txn_info.set_label("test_label"); |
| |
| std::string key = txn_info_key({instance_id, db_id, txn_id}); |
| std::string txn_info_value = txn_info.SerializeAsString(); |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->put(key, txn_info_value); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Create previous partition version to be recycled |
| { |
| std::string partition_version_key = |
| versioned::partition_version_key({instance_id, partition_id}); |
| VersionPB version_pb; |
| version_pb.set_version(100); |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| versioned_put(txn.get(), partition_version_key, version_pb.SerializeAsString()); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Create previous tablet load stats to be recycled |
| { |
| std::string tablet_stats_key = versioned::tablet_load_stats_key({instance_id, tablet_id}); |
| TabletStatsPB stats_pb; |
| // TabletStatsPB typically doesn't have tablet_id field, just create empty stats |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| versioned_put(txn.get(), tablet_stats_key, stats_pb.SerializeAsString()); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Create previous table version to be recycled |
| { |
| std::string table_version_key = versioned::table_version_key({instance_id, table_id}); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| versioned_put(txn.get(), table_version_key, ""); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Create versioned rowset meta (should NOT be recycled) |
| { |
| std::string versioned_rowset_key = |
| versioned::meta_rowset_load_key({instance_id, tablet_id, 100}); |
| doris::RowsetMetaCloudPB rowset_meta; |
| rowset_meta.set_rowset_id(123456); |
| rowset_meta.set_tablet_id(tablet_id); |
| rowset_meta.set_partition_id(partition_id); |
| rowset_meta.set_start_version(100); |
| rowset_meta.set_end_version(100); |
| rowset_meta.set_num_rows(1000); |
| rowset_meta.set_total_disk_size(1024 * 1024); |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_TRUE( |
| versioned::document_put(txn.get(), versioned_rowset_key, std::move(rowset_meta))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Put a commit txn log |
| { |
| std::string log_key = versioned::log_key(instance_id); |
| Versionstamp versionstamp(123, 0); |
| OperationLogPB operation_log; |
| operation_log.set_min_timestamp(versionstamp.version()); |
| |
| auto* commit_txn = operation_log.mutable_commit_txn(); |
| commit_txn->set_txn_id(txn_id); |
| commit_txn->set_db_id(db_id); |
| commit_txn->add_table_ids(table_id); |
| |
| // Add partition version mapping |
| auto& partition_version_map = *commit_txn->mutable_partition_version_map(); |
| partition_version_map[partition_id] = 101; // New version |
| |
| // Add tablet to partition mapping |
| auto& tablet_to_partition_map = *commit_txn->mutable_tablet_to_partition_map(); |
| tablet_to_partition_map[tablet_id] = partition_id; |
| |
| // Add recycle txn info |
| auto* recycle_txn = commit_txn->mutable_recycle_txn(); |
| recycle_txn->set_label("test_label"); |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| versioned::blob_put(txn.get(), log_key, operation_log); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Recycle the operation logs |
| ASSERT_EQ(recycler.recycle_operation_logs(), 0); |
| |
| // Verify that the recycle txn record is created |
| { |
| std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id}); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string value; |
| TxnErrorCode err = txn->get(recycle_key, &value); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| |
| RecycleTxnPB recycle_txn_pb; |
| ASSERT_TRUE(recycle_txn_pb.ParseFromString(value)); |
| ASSERT_EQ(recycle_txn_pb.label(), "test_label"); |
| ASSERT_GT(recycle_txn_pb.creation_time(), 0); |
| } |
| |
| // Verify that previous versions are recycled (should not be found with current snapshot) |
| { |
| MetaReader meta_reader(instance_id, txn_kv.get()); |
| |
| // Previous partition version should be marked for removal |
| Versionstamp partition_version; |
| (void)meta_reader.get_partition_version(partition_id, nullptr, &partition_version); |
| // Since we only have one version, it should still exist but the previous one should be scheduled for removal |
| |
| // Previous tablet stats should be marked for removal |
| Versionstamp tablet_version; |
| (void)meta_reader.get_tablet_load_stats(tablet_id, nullptr, &tablet_version); |
| |
| // Previous table version should be marked for removal |
| Versionstamp table_version; |
| (void)meta_reader.get_table_version(table_id, &table_version); |
| } |
| |
| // Scan the operation logs to verify that the commit txn log is removed |
| remove_instance_info(txn_kv.get()); |
| |
| // Should have TxnInfoPB + RecycleTxnPB + RowsetMetaCloudPB = 3 records |
| ASSERT_EQ(count_range(txn_kv.get()), 3) |
| << "Should have TxnInfoPB, RecycleTxnPB and RowsetMetaCloudPB records"; |
| } |
| |
| TEST(RecycleOperationLogTest, RecycleCommitTxnLogWhenTxnIsNotVisible) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| txn_kv->update_commit_version(1000); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_ENABLED); |
| update_instance_info(txn_kv.get(), instance); |
| |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| |
| uint64_t db_id = 1; |
| uint64_t table_id = 2; |
| uint64_t partition_id = 4; |
| uint64_t tablet_id = 5; |
| uint64_t txn_id = 12345; |
| |
| // Create TxnInfo with COMMITTED status (not VISIBLE) |
| { |
| TxnInfoPB txn_info; |
| txn_info.set_db_id(db_id); |
| txn_info.set_txn_id(txn_id); |
| txn_info.set_status(TxnStatusPB::TXN_STATUS_COMMITTED); // Not VISIBLE |
| txn_info.set_label("test_label"); |
| |
| std::string key = txn_info_key({instance_id, db_id, txn_id}); |
| std::string txn_info_value = txn_info.SerializeAsString(); |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->put(key, txn_info_value); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Create previous partition version |
| { |
| std::string partition_version_key = |
| versioned::partition_version_key({instance_id, partition_id}); |
| VersionPB version_pb; |
| version_pb.set_version(100); |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| versioned_put(txn.get(), partition_version_key, version_pb.SerializeAsString()); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Put a commit txn log |
| { |
| std::string log_key = versioned::log_key(instance_id); |
| Versionstamp versionstamp(123, 0); |
| OperationLogPB operation_log; |
| operation_log.set_min_timestamp(versionstamp.version()); |
| |
| auto* commit_txn = operation_log.mutable_commit_txn(); |
| commit_txn->set_txn_id(txn_id); |
| commit_txn->set_db_id(db_id); |
| commit_txn->add_table_ids(table_id); |
| |
| // Add partition version mapping |
| auto& partition_version_map = *commit_txn->mutable_partition_version_map(); |
| partition_version_map[partition_id] = 101; |
| |
| // Add tablet to partition mapping |
| auto& tablet_to_partition_map = *commit_txn->mutable_tablet_to_partition_map(); |
| tablet_to_partition_map[tablet_id] = partition_id; |
| |
| // Add recycle txn info |
| auto* recycle_txn = commit_txn->mutable_recycle_txn(); |
| recycle_txn->set_label("test_label"); |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| versioned::blob_put(txn.get(), log_key, operation_log); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Recycle the operation logs |
| ASSERT_EQ(recycler.recycle_operation_logs(), 0); |
| |
| // Verify that NO recycle txn record is created (because txn is not VISIBLE) |
| { |
| std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id}); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string value; |
| TxnErrorCode err = txn->get(recycle_key, &value); |
| ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND) |
| << "Recycle txn record should not exist for non-VISIBLE transactions"; |
| } |
| |
| // Verify that the original partition version still exists (not recycled) |
| { |
| MetaReader meta_reader(instance_id, txn_kv.get()); |
| Versionstamp partition_version; |
| TxnErrorCode err = |
| meta_reader.get_partition_version(partition_id, nullptr, &partition_version); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK) |
| << "Partition version should still exist for non-VISIBLE transactions"; |
| } |
| |
| // Scan the operation logs to verify that the commit txn log is removed |
| remove_instance_info(txn_kv.get()); |
| |
| // Should have TxnInfoPB + partition_version + RowsetMetaCloudPB = 3 records (no recycling for non-VISIBLE txn) |
| ASSERT_EQ(count_range(txn_kv.get()), 3) |
| << "Should have TxnInfoPB, partition version and RowsetMetaCloudPB records"; |
| } |
| |
| TEST(RecycleOperationLogTest, RecycleUpdateTabletLog) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| txn_kv->update_commit_version(1000); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_ENABLED); |
| update_instance_info(txn_kv.get(), instance); |
| |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| |
| uint64_t tablet_id = 1; |
| { |
| // Update the tablet meta |
| std::string tablet_meta_key = versioned::meta_tablet_key({instance_id, tablet_id}); |
| doris::TabletMetaCloudPB tablet_meta; |
| tablet_meta.set_tablet_id(tablet_id); |
| tablet_meta.set_creation_time(::time(nullptr)); |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| versioned_put(txn.get(), tablet_meta_key, tablet_meta.SerializeAsString()); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| MetaReader meta_reader(instance_id, txn_kv.get()); |
| Versionstamp versionstamp; |
| TxnErrorCode err = meta_reader.get_tablet_meta(tablet_id, &tablet_meta, &versionstamp); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| // Put an update tablet log |
| std::string log_key = versioned::log_key(instance_id); |
| Versionstamp versionstamp(123, 0); |
| OperationLogPB operation_log; |
| operation_log.set_min_timestamp(versionstamp.version()); |
| auto* update_tablet = operation_log.mutable_update_tablet(); |
| update_tablet->add_tablet_ids(tablet_id); |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = txn_kv->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| versioned::blob_put(txn.get(), log_key, operation_log); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Recycle the operation logs. |
| ASSERT_EQ(recycler.recycle_operation_logs(), 0); |
| |
| // The tablet meta should be removed |
| { |
| MetaReader meta_reader(instance_id, txn_kv.get()); |
| doris::TabletMetaCloudPB tablet_meta; |
| Versionstamp versionstamp; |
| TxnErrorCode err = meta_reader.get_tablet_meta(tablet_id, &tablet_meta, &versionstamp); |
| ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND); |
| } |
| // The operation log should be removed |
| remove_instance_info(txn_kv.get()); |
| ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get()); |
| } |
| |
| std::unique_ptr<MetaServiceProxy> get_meta_service(bool mock_resource_mgr) { |
| int ret = 0; |
| // MemKv |
| auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| if (txn_kv != nullptr) { |
| ret = txn_kv->init(); |
| [&] { ASSERT_EQ(ret, 0); }(); |
| } |
| [&] { ASSERT_NE(txn_kv.get(), nullptr); }(); |
| |
| // FdbKv |
| // config::fdb_cluster_file_path = "fdb.cluster"; |
| // static auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<FdbTxnKv>()); |
| // static std::atomic<bool> init {false}; |
| // bool tmp = false; |
| // if (init.compare_exchange_strong(tmp, true)) { |
| // int ret = txn_kv->init(); |
| // [&] { ASSERT_EQ(ret, 0); ASSERT_NE(txn_kv.get(), nullptr); }(); |
| // } |
| |
| std::unique_ptr<Transaction> txn; |
| EXPECT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->remove("\x00", "\xfe"); // This is dangerous if the fdb is not correctly set |
| EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| auto rs = mock_resource_mgr ? std::make_shared<MockResourceManager>(txn_kv) |
| : std::make_shared<ResourceManager>(txn_kv); |
| auto rl = std::make_shared<RateLimiter>(); |
| auto snapshot = std::make_shared<SnapshotManager>(txn_kv); |
| auto meta_service = std::make_unique<MetaServiceImpl>(txn_kv, rs, rl, snapshot); |
| return std::make_unique<MetaServiceProxy>(std::move(meta_service)); |
| } |
| |
| namespace doris::cloud { |
| static std::string next_rowset_id() { |
| static int cnt = 0; |
| return std::to_string(++cnt); |
| } |
| |
| void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t index_id, int64_t partition_id, |
| int64_t tablet_id) { |
| auto tablet = req.add_tablet_metas(); |
| tablet->set_table_id(table_id); |
| tablet->set_index_id(index_id); |
| tablet->set_partition_id(partition_id); |
| tablet->set_tablet_id(tablet_id); |
| auto schema = tablet->mutable_schema(); |
| schema->set_schema_version(0); |
| auto first_rowset = tablet->add_rs_metas(); |
| first_rowset->set_rowset_id(0); // required |
| first_rowset->set_rowset_id_v2(next_rowset_id()); |
| first_rowset->set_start_version(0); |
| first_rowset->set_end_version(1); |
| first_rowset->mutable_tablet_schema()->CopyFrom(*schema); |
| } |
| |
| void create_tablet(MetaServiceProxy* meta_service, int64_t table_id, int64_t index_id, |
| int64_t partition_id, int64_t tablet_id) { |
| brpc::Controller cntl; |
| CreateTabletsRequest req; |
| CreateTabletsResponse res; |
| req.set_db_id(1); |
| add_tablet(req, table_id, index_id, partition_id, tablet_id); |
| meta_service->create_tablets(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id; |
| } |
| |
| doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int partition_id = 10, |
| int64_t version = -1, int num_rows = 100) { |
| doris::RowsetMetaCloudPB rowset; |
| rowset.set_rowset_id(0); // required |
| rowset.set_rowset_id_v2(next_rowset_id()); |
| rowset.set_tablet_id(tablet_id); |
| rowset.set_partition_id(partition_id); |
| rowset.set_txn_id(txn_id); |
| if (version > 0) { |
| rowset.set_start_version(version); |
| rowset.set_end_version(version); |
| } |
| rowset.set_num_segments(1); |
| rowset.set_num_rows(num_rows); |
| rowset.set_data_disk_size(num_rows * 100); |
| rowset.set_index_disk_size(num_rows * 10); |
| rowset.set_total_disk_size(num_rows * 110); |
| rowset.mutable_tablet_schema()->set_schema_version(0); |
| auto* key_bounds = rowset.add_segments_key_bounds(); |
| key_bounds->set_min_key(fmt::format("min-key-{}", rowset.rowset_id_v2())); |
| key_bounds->set_max_key(fmt::format("max-key-{}", rowset.rowset_id_v2())); |
| rowset.set_segments_key_bounds_truncated(false); |
| rowset.set_txn_expiration(::time(nullptr)); // Required by DCHECK |
| return rowset; |
| } |
| |
| TEST(RecycleOperationLogTest, RecycleCompactionLog) { |
| // Ensure strip behavior is enabled for this test |
| auto old_flag = config::enable_recycle_rowset_strip_key_bounds; |
| config::enable_recycle_rowset_strip_key_bounds = true; |
| DORIS_CLOUD_DEFER { |
| config::enable_recycle_rowset_strip_key_bounds = old_flag; |
| }; |
| |
| auto meta_service = get_meta_service(false); |
| std::string test_instance_id = "recycle_compaction_log_test"; |
| auto* sp = SyncPoint::get_instance(); |
| DORIS_CLOUD_DEFER { |
| SyncPoint::get_instance()->clear_all_call_backs(); |
| }; |
| sp->set_call_back("get_instance_id", [&](auto&& args) { |
| auto* ret = try_any_cast_ret<std::string>(args); |
| ret->first = test_instance_id; |
| ret->second = true; |
| }); |
| sp->set_call_back("check_lazy_txn_finished::bypass_check", [&](auto&& args) { |
| auto* ret = doris::try_any_cast_ret<bool>(args); |
| ret->first = true; |
| ret->second = true; |
| }); |
| sp->set_call_back("delete_rowset_data::bypass_check", [&](auto&& args) { |
| auto* ret = doris::try_any_cast_ret<bool>(args); |
| ret->first = true; |
| ret->second = true; |
| }); |
| sp->set_call_back("recycle_tablet::bypass_check", [&](auto&& args) { |
| auto* ret = doris::try_any_cast_ret<bool>(args); |
| ret->first = false; |
| ret->second = true; |
| }); |
| sp->enable_processing(); |
| |
| constexpr int64_t table_id = 20001; |
| constexpr int64_t index_id = 20002; |
| constexpr int64_t partition_id = 20003; |
| constexpr int64_t tablet_id = 20004; |
| |
| { |
| // write instance |
| InstanceInfoPB instance_info; |
| instance_info.set_instance_id(test_instance_id); |
| instance_info.set_multi_version_status(MULTI_VERSION_ENABLED); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->put(instance_key(test_instance_id), instance_info.SerializeAsString()); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| meta_service->resource_mgr()->refresh_instance(test_instance_id); |
| ASSERT_TRUE(meta_service->resource_mgr()->is_version_write_enabled(test_instance_id)); |
| } |
| |
| { |
| // Create tablet first |
| create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id); |
| } |
| |
| // Create input rowsets for compaction (versions 2-4) |
| std::vector<doris::RowsetMetaCloudPB> input_rowsets; |
| auto txn_kv = meta_service->txn_kv(); |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| for (int i = 0; i < 3; ++i) { |
| auto rowset = create_rowset(i + 100, tablet_id, partition_id, i + 2, 50 * (i + 1)); |
| input_rowsets.push_back(rowset); |
| |
| // Put rowset directly to meta storage |
| auto rowset_key = meta_rowset_key({test_instance_id, tablet_id, rowset.end_version()}); |
| auto rowset_val = rowset.SerializeAsString(); |
| txn->put(rowset_key, rowset_val); |
| auto versioned_rowset_key = versioned::meta_rowset_load_key( |
| {test_instance_id, tablet_id, rowset.end_version()}); |
| versioned::document_put(txn.get(), versioned_rowset_key, std::move(rowset)); |
| } |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Create output rowset as tmp rowset |
| constexpr int64_t txn_id = 30001; |
| constexpr int64_t output_start_version = 2; |
| constexpr int64_t output_end_version = 4; |
| auto output_rowset = create_rowset(200, tablet_id, partition_id, output_start_version, 100); |
| output_rowset.set_end_version(output_end_version); // Set end version to create 2-4 range |
| output_rowset.set_txn_id(txn_id); |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| // Put tmp rowset |
| auto tmp_rowset_key = meta_rowset_tmp_key({test_instance_id, txn_id, tablet_id}); |
| auto tmp_rowset_val = output_rowset.SerializeAsString(); |
| txn->put(tmp_rowset_key, tmp_rowset_val); |
| |
| // Create initial tablet stats |
| TabletStatsPB initial_stats; |
| initial_stats.set_num_rows(150); // Total from input rowsets |
| initial_stats.set_data_size(150 * 50); |
| initial_stats.set_num_rowsets(3); |
| initial_stats.set_num_segments(3); |
| initial_stats.set_index_size(100); |
| initial_stats.set_segment_size(200); |
| initial_stats.set_cumulative_point(1); |
| |
| auto stats_key = |
| stats_tablet_key({test_instance_id, table_id, index_id, partition_id, tablet_id}); |
| auto stats_val = initial_stats.SerializeAsString(); |
| txn->put(stats_key, stats_val); |
| auto versioned_load_stats_key = |
| versioned::tablet_load_stats_key({test_instance_id, tablet_id}); |
| versioned_put(txn.get(), versioned_load_stats_key, stats_val); |
| |
| // Create tablet compact stats for versioned storage |
| auto tablet_compact_stats_key = |
| versioned::tablet_compact_stats_key({test_instance_id, tablet_id}); |
| auto tablet_compact_stats_val = initial_stats.SerializeAsString(); |
| versioned_put(txn.get(), tablet_compact_stats_key, tablet_compact_stats_val); |
| |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Start compaction job first |
| const std::string job_id = "test_compaction_job"; |
| const std::string initiator = "test_be"; |
| |
| { |
| brpc::Controller cntl; |
| StartTabletJobRequest req; |
| StartTabletJobResponse res; |
| req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id); |
| auto compaction = req.mutable_job()->add_compaction(); |
| compaction->set_id(job_id); |
| compaction->set_initiator(initiator); |
| compaction->set_type(TabletCompactionJobPB::CUMULATIVE); |
| compaction->set_base_compaction_cnt(0); |
| compaction->set_cumulative_compaction_cnt(0); |
| compaction->add_input_versions(2); |
| compaction->add_input_versions(4); |
| long now = time(nullptr); |
| compaction->set_expiration(now + 12); |
| compaction->set_lease(now + 3); |
| meta_service->start_tablet_job(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| // Now use real finish_tablet_job to trigger process_compaction_job |
| { |
| brpc::Controller cntl; |
| FinishTabletJobRequest req; |
| FinishTabletJobResponse res; |
| |
| req.set_action(FinishTabletJobRequest::COMMIT); |
| req.mutable_job()->mutable_idx()->set_table_id(table_id); |
| req.mutable_job()->mutable_idx()->set_index_id(index_id); |
| req.mutable_job()->mutable_idx()->set_partition_id(partition_id); |
| req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id); |
| |
| auto compaction = req.mutable_job()->add_compaction(); |
| compaction->set_id(job_id); |
| compaction->set_initiator(initiator); |
| compaction->set_type(TabletCompactionJobPB::CUMULATIVE); |
| |
| // Input versions and rowsets |
| compaction->add_input_versions(2); |
| compaction->add_input_versions(4); |
| |
| // Output information |
| compaction->add_txn_id(txn_id); |
| compaction->add_output_versions(output_end_version); |
| compaction->add_output_rowset_ids(output_rowset.rowset_id_v2()); |
| compaction->set_output_cumulative_point(5); |
| |
| // Compaction stats for updating tablet stats |
| compaction->set_size_input_rowsets(150 * 50); // Size of input rowsets |
| compaction->set_index_size_input_rowsets(100); |
| compaction->set_segment_size_input_rowsets(200); |
| compaction->set_num_input_rows(150); |
| compaction->set_num_input_rowsets(3); |
| compaction->set_num_input_segments(3); |
| |
| compaction->set_size_output_rowsets(100 * 50); // Size of output rowset |
| compaction->set_index_size_output_rowsets(50); |
| compaction->set_segment_size_output_rowsets(100); |
| compaction->set_num_output_rows(100); |
| compaction->set_num_output_rowsets(1); |
| compaction->set_num_output_segments(1); |
| |
| // This will trigger process_compaction_job internally and create compaction log |
| meta_service->finish_tablet_job(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| // Verify compaction log was created |
| Versionstamp log_version; |
| OperationLogPB operation_log; |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string log_key = versioned::log_key({test_instance_id}); |
| ASSERT_EQ(read_operation_log(txn.get(), log_key, &log_version, &operation_log), |
| TxnErrorCode::TXN_OK); |
| |
| ASSERT_TRUE(operation_log.has_compaction()); |
| // Ensure input rowsets carried key bounds before strip |
| ASSERT_GT(input_rowsets.front().segments_key_bounds_size(), 0); |
| |
| const auto& compaction_log = operation_log.compaction(); |
| ASSERT_EQ(compaction_log.tablet_id(), tablet_id); |
| ASSERT_EQ(compaction_log.start_version(), 2); |
| ASSERT_EQ(compaction_log.end_version(), 4); |
| ASSERT_EQ(compaction_log.recycle_rowsets_size(), 3); |
| for (const auto& recycle_rs : compaction_log.recycle_rowsets()) { |
| ASSERT_EQ(recycle_rs.rowset_meta().segments_key_bounds_size(), 0); |
| ASSERT_FALSE(recycle_rs.rowset_meta().has_segments_key_bounds_truncated()); |
| } |
| } |
| |
| // Set up recycler using the same txn_kv as meta_service |
| InstanceInfoPB instance_info; |
| instance_info.set_instance_id(test_instance_id); |
| instance_info.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_ENABLED); |
| |
| InstanceRecycler recycler(txn_kv, instance_info, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| |
| // Now recycle the compaction operation log |
| ASSERT_EQ(recycler.recycle_operation_logs(), 0); |
| |
| // Verify that input rowsets are converted to recycle rowsets |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| for (const auto& input_rowset : input_rowsets) { |
| // Check that recycle rowset was created |
| std::string recycle_key = |
| recycle_rowset_key({test_instance_id, tablet_id, input_rowset.rowset_id_v2()}); |
| std::string recycle_value; |
| TxnErrorCode err = txn->get(recycle_key, &recycle_value); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK) |
| << "Recycle rowset should exist for rowset " << input_rowset.rowset_id_v2(); |
| |
| RecycleRowsetPB recycle_rowset_pb; |
| ASSERT_TRUE(recycle_rowset_pb.ParseFromString(recycle_value)); |
| ASSERT_EQ(recycle_rowset_pb.rowset_meta().rowset_id_v2(), input_rowset.rowset_id_v2()); |
| ASSERT_EQ(recycle_rowset_pb.rowset_meta().tablet_id(), tablet_id); |
| |
| // Check that compact and load keys don't exist (they never existed in this test scenario) |
| std::string meta_rowset_compact_key = versioned::meta_rowset_compact_key( |
| {test_instance_id, tablet_id, input_rowset.end_version()}); |
| std::string meta_rowset_load_key = versioned::meta_rowset_load_key( |
| {test_instance_id, tablet_id, input_rowset.end_version()}); |
| |
| RowsetMetaCloudPB compact_pb, load_pb; |
| Versionstamp compact_vs, load_vs; |
| if (input_rowset.end_version() == output_end_version) { |
| ASSERT_EQ(versioned::document_get(txn.get(), meta_rowset_compact_key, &compact_pb, |
| &compact_vs), |
| TxnErrorCode::TXN_OK) |
| << "Output rowset compact meta should exist"; |
| |
| // Check that this pb is indeed the 2-4 output rowset |
| ASSERT_EQ(compact_pb.start_version(), output_start_version) |
| << "Output rowset should have start_version = " << output_start_version; |
| ASSERT_EQ(compact_pb.end_version(), output_end_version) |
| << "Output rowset should have end_version = " << output_end_version; |
| ASSERT_EQ(compact_pb.tablet_id(), tablet_id) |
| << "Output rowset should have correct tablet_id"; |
| ASSERT_EQ(compact_pb.rowset_id_v2(), output_rowset.rowset_id_v2()) |
| << "Output rowset should have correct rowset_id_v2"; |
| |
| } else { |
| ASSERT_EQ(versioned::document_get(txn.get(), meta_rowset_compact_key, &compact_pb, |
| &compact_vs), |
| TxnErrorCode::TXN_KEY_NOT_FOUND) |
| << "Input rowset compact meta should not exist"; |
| ASSERT_EQ(versioned::document_get(txn.get(), meta_rowset_load_key, &load_pb, |
| &load_vs), |
| TxnErrorCode::TXN_KEY_NOT_FOUND) |
| << "Input rowset load meta should not exist"; |
| } |
| } |
| } |
| |
| // Verify tablet compact stats still exist (recycling compaction log should not delete them) |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| std::string tablet_compact_stats_key = |
| versioned::tablet_compact_stats_key({test_instance_id, tablet_id}); |
| std::string tablet_compact_stats_value; |
| Versionstamp* versionstamp = nullptr; |
| TxnErrorCode err = versioned_get(txn.get(), tablet_compact_stats_key, versionstamp, |
| &tablet_compact_stats_value); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK) |
| << "Tablet compact stats should still exist after recycling compaction log"; |
| } |
| |
| // Verify the operation log is removed |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| std::string log_key_with_version = |
| encode_versioned_key(versioned::log_key(test_instance_id), log_version); |
| std::string log_value; |
| TxnErrorCode err = txn->get(log_key_with_version, &log_value); |
| ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND) |
| << "Operation log should be removed after recycling"; |
| } |
| |
| // Now test second compaction to verify meta_rowset_compact_key deletion |
| // Add version 5 rowset for second compaction and update tablet stats |
| auto version5_rowset = create_rowset(300, tablet_id, partition_id, 5, 50); |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| // Add version 5 rowset |
| auto rowset_key_5 = meta_rowset_key({test_instance_id, tablet_id, 5}); |
| txn->put(rowset_key_5, version5_rowset.SerializeAsString()); |
| |
| // Update tablet stats to include version 5 rowset |
| TabletStatsPB updated_stats; |
| updated_stats.set_num_rows(150); // 100 from 2-4 + 50 from version 5 |
| updated_stats.set_data_size(150 * 100); // Updated data size |
| updated_stats.set_num_rowsets(2); // 2-4 rowset + version 5 rowset |
| updated_stats.set_num_segments(2); |
| updated_stats.set_index_size(150); |
| updated_stats.set_segment_size(300); |
| updated_stats.set_cumulative_point(5); // Updated cumulative point |
| |
| auto stats_key = |
| stats_tablet_key({test_instance_id, table_id, index_id, partition_id, tablet_id}); |
| txn->put(stats_key, updated_stats.SerializeAsString()); |
| |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Create second output rowset as tmp rowset (2-5) |
| constexpr int64_t second_txn_id = 30002; |
| constexpr int64_t second_output_end_version = 5; |
| auto second_output_rowset = create_rowset(400, tablet_id, partition_id, 2, 200); |
| second_output_rowset.set_end_version(second_output_end_version); |
| second_output_rowset.set_txn_id(second_txn_id); |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| auto tmp_rowset_key = meta_rowset_tmp_key({test_instance_id, second_txn_id, tablet_id}); |
| txn->put(tmp_rowset_key, second_output_rowset.SerializeAsString()); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Start second compaction job (2-4 + 5-5 -> 2-5) |
| const std::string second_job_id = "test_second_compaction_job"; |
| { |
| brpc::Controller cntl; |
| StartTabletJobRequest req; |
| StartTabletJobResponse res; |
| req.mutable_job()->mutable_idx()->set_table_id(table_id); |
| req.mutable_job()->mutable_idx()->set_index_id(index_id); |
| req.mutable_job()->mutable_idx()->set_partition_id(partition_id); |
| req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id); |
| auto* compaction = req.mutable_job()->add_compaction(); |
| compaction->set_id(second_job_id); |
| compaction->set_initiator(initiator); |
| compaction->set_type(TabletCompactionJobPB::CUMULATIVE); |
| compaction->set_base_compaction_cnt(0); |
| compaction->set_cumulative_compaction_cnt(1); // Already did one compaction |
| compaction->add_input_versions(2); // 2-4 range |
| compaction->add_input_versions(5); // + version 5 |
| long now = time(nullptr); |
| compaction->set_expiration(now + 12); |
| compaction->set_lease(now + 3); |
| meta_service->start_tablet_job(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| // Finish second compaction job to create second compaction log |
| { |
| brpc::Controller cntl; |
| FinishTabletJobRequest req; |
| FinishTabletJobResponse res; |
| |
| req.set_action(FinishTabletJobRequest::COMMIT); |
| req.mutable_job()->mutable_idx()->set_table_id(table_id); |
| req.mutable_job()->mutable_idx()->set_index_id(index_id); |
| req.mutable_job()->mutable_idx()->set_partition_id(partition_id); |
| req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id); |
| |
| auto* compaction = req.mutable_job()->add_compaction(); |
| compaction->set_id(second_job_id); |
| compaction->set_initiator(initiator); |
| compaction->set_type(TabletCompactionJobPB::CUMULATIVE); |
| |
| // Input: 2-4 and 5-5 |
| compaction->add_input_versions(2); |
| compaction->add_input_versions(5); |
| |
| // Output: 2-5 |
| compaction->add_txn_id(second_txn_id); |
| compaction->add_output_versions(second_output_end_version); |
| compaction->add_output_rowset_ids(second_output_rowset.rowset_id_v2()); |
| compaction->set_output_cumulative_point(6); |
| |
| // Compaction stats |
| compaction->set_size_input_rowsets(150 * 50); |
| compaction->set_index_size_input_rowsets(150); |
| compaction->set_segment_size_input_rowsets(300); |
| compaction->set_num_input_rows(150); |
| compaction->set_num_input_rowsets(2); |
| compaction->set_num_input_segments(2); |
| |
| compaction->set_size_output_rowsets(200 * 50); |
| compaction->set_index_size_output_rowsets(100); |
| compaction->set_segment_size_output_rowsets(200); |
| compaction->set_num_output_rows(200); |
| compaction->set_num_output_rowsets(1); |
| compaction->set_num_output_segments(1); |
| |
| // This creates the second compaction log |
| meta_service->finish_tablet_job(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| // Verify that 2-4 meta_rowset_compact_key exists before recycling |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| auto compact_key_24 = versioned::meta_rowset_compact_key({test_instance_id, tablet_id, 4}); |
| RowsetMetaCloudPB compact_pb_24; |
| Versionstamp compact_vs_24; |
| ASSERT_EQ( |
| versioned::document_get(txn.get(), compact_key_24, &compact_pb_24, &compact_vs_24), |
| TxnErrorCode::TXN_OK) |
| << "2-4 rowset compact meta should exist before second recycling"; |
| // Check that this pb is indeed the 2-4 rowset |
| ASSERT_EQ(compact_pb_24.start_version(), 2) << "Should be 2-4 rowset"; |
| ASSERT_EQ(compact_pb_24.end_version(), 4) << "Should be 2-4 rowset"; |
| ASSERT_EQ(compact_pb_24.rowset_id_v2(), output_rowset.rowset_id_v2()) |
| << "Should match output rowset ID"; |
| } |
| |
| // Recycle the second compaction log using the same recycler |
| ASSERT_EQ(recycler.recycle_operation_logs(), 0); |
| |
| // Verify that 2-4 meta_rowset_compact_key is now deleted |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| auto compact_key_24 = versioned::meta_rowset_compact_key({test_instance_id, tablet_id, 4}); |
| RowsetMetaCloudPB compact_pb_24; |
| Versionstamp compact_vs_24; |
| ASSERT_EQ( |
| versioned::document_get(txn.get(), compact_key_24, &compact_pb_24, &compact_vs_24), |
| TxnErrorCode::TXN_KEY_NOT_FOUND) |
| << "2-4 rowset compact meta should be deleted after recycling second compaction " |
| "log"; |
| |
| // Verify that recycle rowset was created for 2-4 |
| std::string recycle_key_24 = |
| recycle_rowset_key({test_instance_id, tablet_id, output_rowset.rowset_id_v2()}); |
| std::string recycle_value_24; |
| ASSERT_EQ(txn->get(recycle_key_24, &recycle_value_24), TxnErrorCode::TXN_OK) |
| << "Recycle rowset should exist for 2-4 rowset"; |
| |
| RecycleRowsetPB recycle_pb_24; |
| ASSERT_TRUE(recycle_pb_24.ParseFromString(recycle_value_24)); |
| ASSERT_EQ(recycle_pb_24.rowset_meta().start_version(), 2); |
| ASSERT_EQ(recycle_pb_24.rowset_meta().end_version(), 4); |
| ASSERT_EQ(recycle_pb_24.rowset_meta().rowset_id_v2(), output_rowset.rowset_id_v2()); |
| } |
| |
| // Finally, test tablet recycling to verify tablet compact stats deletion |
| // Now recycle tablets to clean up tablet compact stats |
| RecyclerMetricsContext ctx(instance_id, "test"); |
| ASSERT_EQ(recycler.recycle_tablets(table_id, index_id, ctx), 0); |
| |
| // Verify tablet compact stats are now deleted (recycling tablets should delete them) |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| std::string tablet_compact_stats_key = |
| versioned::tablet_compact_stats_key({test_instance_id, tablet_id}); |
| std::string tablet_compact_stats_value; |
| Versionstamp* versionstamp = nullptr; |
| TxnErrorCode err = versioned_get(txn.get(), tablet_compact_stats_key, versionstamp, |
| &tablet_compact_stats_value); |
| ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND) |
| << "Tablet compact stats should be deleted after recycling tablets"; |
| } |
| } |
| |
| TEST(RecycleOperationLogTest, RecycleSchemaChangeLog) { |
| // ========================================================================= |
| // Recycle Schema Change Operation Log Test |
| // ========================================================================= |
| // This test simulates the complete schema change process and then verifies that: |
| // 1. recycler.recycle_operation_logs removes schema change operation logs |
| // 2. meta_rowset_compact_key is properly deleted after recycling operation logs |
| // 3. recycler.recycle_tablets removes meta tablet key and tablet load stats key |
| // |
| // Test Scenario (copied from SchemaChangeLog): |
| // - Old tablet starts with versions [2-2, 3-3] (20+30=50 rows) |
| // - During schema change, both tablets receive versions [4-4, 5-5] (alter version = 5, 40+50=90 rows) |
| // - After schema change starts, both tablets receive versions [6-6, 7-7] (60+70=130 rows) |
| // - When finishing schema change: |
| // * Old tablet's [2-2, 3-3, 4-4, 5-5] data is converted to tmp rowsets in new tablet |
| // * New tablet deletes its own [4-4, 5-5] rowsets and converts tmp rowsets to permanent |
| // * New tablet keeps [6-6, 7-7] unchanged |
| // * Schema change net effect: add [2-2, 3-3] data to new tablet's tablet_load_stats |
| // * Multi-version keys are properly created for new tablet |
| // - Then test recycling to clean up schema change operation logs and keys |
| // ========================================================================= |
| auto old_flag = config::enable_recycle_rowset_strip_key_bounds; |
| config::enable_recycle_rowset_strip_key_bounds = true; |
| DORIS_CLOUD_DEFER { |
| config::enable_recycle_rowset_strip_key_bounds = old_flag; |
| }; |
| |
| // Step 1: Initialize test environment using recycler infrastructure |
| auto meta_service = get_meta_service(false); |
| std::string instance_id = "recycle_schema_change_test"; |
| auto* sp = SyncPoint::get_instance(); |
| DORIS_CLOUD_DEFER { |
| SyncPoint::get_instance()->clear_all_call_backs(); |
| }; |
| sp->set_call_back("get_instance_id", [&](auto&& args) { |
| auto* ret = try_any_cast_ret<std::string>(args); |
| ret->first = instance_id; |
| ret->second = true; |
| }); |
| sp->set_call_back("check_lazy_txn_finished::bypass_check", [&](auto&& args) { |
| auto* ret = doris::try_any_cast_ret<bool>(args); |
| ret->first = true; |
| ret->second = true; |
| }); |
| sp->set_call_back("delete_rowset_data::bypass_check", [&](auto&& args) { |
| auto* ret = doris::try_any_cast_ret<bool>(args); |
| ret->first = true; |
| ret->second = true; |
| }); |
| sp->set_call_back("recycle_tablet::bypass_check", [&](auto&& args) { |
| auto* ret = doris::try_any_cast_ret<bool>(args); |
| ret->first = false; |
| ret->second = true; |
| }); |
| sp->enable_processing(); |
| |
| constexpr int64_t table_id = 50001; |
| constexpr int64_t index_id = 50002; |
| constexpr int64_t partition_id = 50003; |
| constexpr int64_t old_tablet_id = 50004; |
| constexpr int64_t new_tablet_id = 50005; |
| |
| // Create instance with multi-version support |
| { |
| InstanceInfoPB instance_info; |
| instance_info.set_instance_id(instance_id); |
| instance_info.set_multi_version_status(MULTI_VERSION_ENABLED); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->put(instance_key(instance_id), instance_info.SerializeAsString()); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| meta_service->resource_mgr()->refresh_instance(instance_id); |
| ASSERT_TRUE(meta_service->resource_mgr()->is_version_write_enabled(instance_id)); |
| } |
| |
| // Create old tablet |
| create_tablet(meta_service.get(), table_id, index_id, partition_id, old_tablet_id); |
| |
| auto txn_kv = meta_service->txn_kv(); |
| |
| // Create initial rowsets for old tablet: versions 2, 3 |
| std::vector<doris::RowsetMetaCloudPB> initial_rowsets; |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| for (int version = 2; version <= 3; ++version) { |
| auto rowset = create_rowset(100 + version, old_tablet_id, partition_id, version, |
| version * 10); |
| initial_rowsets.push_back(rowset); |
| |
| auto rowset_key = meta_rowset_key({instance_id, old_tablet_id, version}); |
| auto rowset_val = rowset.SerializeAsString(); |
| txn->put(rowset_key, rowset_val); |
| auto versioned_rowset_key = versioned::meta_rowset_load_key( |
| {instance_id, old_tablet_id, rowset.end_version()}); |
| versioned::document_put(txn.get(), versioned_rowset_key, std::move(rowset)); |
| } |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Step 2: Create new tablet in NOTREADY state (empty, waiting for schema change) |
| { |
| brpc::Controller cntl; |
| CreateTabletsRequest req; |
| CreateTabletsResponse res; |
| req.set_db_id(1000); |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| add_tablet(req, table_id, index_id, partition_id, new_tablet_id); |
| |
| // Set new tablet state to NOTREADY (for schema change) |
| auto tablet_meta = req.mutable_tablet_metas(0); |
| tablet_meta->set_tablet_state(doris::TabletStatePB::PB_NOTREADY); |
| |
| meta_service->create_tablets(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| // Step 3: Add alter versions [4-4, 5-5] and start schema change job (alter_version = 5) |
| const std::string job_id = "recycle_schema_change_job"; |
| const std::string initiator = "test_be"; |
| constexpr int64_t alter_version = 5; // Determined when starting the job |
| |
| // Add versions 4-5 to old tablet (alter versions) |
| std::vector<doris::RowsetMetaCloudPB> alter_rowsets_old; |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| for (int version = 4; version <= 5; ++version) { |
| auto rowset = create_rowset(200 + version, old_tablet_id, partition_id, version, |
| version * 10); |
| alter_rowsets_old.push_back(rowset); |
| |
| auto rowset_key = meta_rowset_key({instance_id, old_tablet_id, version}); |
| auto rowset_val = rowset.SerializeAsString(); |
| txn->put(rowset_key, rowset_val); |
| auto versioned_rowset_key = versioned::meta_rowset_load_key( |
| {instance_id, old_tablet_id, rowset.end_version()}); |
| versioned::document_put(txn.get(), versioned_rowset_key, std::move(rowset)); |
| } |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Add versions 4-5 to new tablet (alter versions) |
| std::vector<doris::RowsetMetaCloudPB> alter_rowsets_new; |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| for (int version = 4; version <= 5; ++version) { |
| auto rowset = create_rowset(300 + version, new_tablet_id, partition_id, version, |
| version * 10); |
| alter_rowsets_new.push_back(rowset); |
| |
| auto rowset_key = meta_rowset_key({instance_id, new_tablet_id, version}); |
| auto rowset_val = rowset.SerializeAsString(); |
| txn->put(rowset_key, rowset_val); |
| auto versioned_rowset_key = versioned::meta_rowset_load_key( |
| {instance_id, new_tablet_id, rowset.end_version()}); |
| versioned::document_put(txn.get(), versioned_rowset_key, std::move(rowset)); |
| } |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Start schema change job |
| { |
| brpc::Controller cntl; |
| StartTabletJobRequest req; |
| StartTabletJobResponse res; |
| req.mutable_job()->mutable_idx()->set_tablet_id(old_tablet_id); |
| auto* schema_change = req.mutable_job()->mutable_schema_change(); |
| schema_change->set_id(job_id); |
| schema_change->set_initiator(initiator); |
| schema_change->mutable_new_tablet_idx()->set_table_id(table_id); |
| schema_change->mutable_new_tablet_idx()->set_index_id(index_id); |
| schema_change->mutable_new_tablet_idx()->set_partition_id(partition_id); |
| schema_change->mutable_new_tablet_idx()->set_tablet_id(new_tablet_id); |
| schema_change->set_alter_version(alter_version); |
| |
| long now = time(nullptr); |
| schema_change->set_expiration(now + 12); |
| |
| meta_service->start_tablet_job(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| // Step 4: Simulate ongoing data ingestion - add post-alter versions [6-6, 7-7] |
| std::vector<doris::RowsetMetaCloudPB> post_alter_rowsets_old; |
| std::vector<doris::RowsetMetaCloudPB> post_alter_rowsets_new; |
| |
| // Add versions 6-7 to old tablet |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| for (int version = 6; version <= 7; ++version) { |
| auto rowset = create_rowset(400 + version, old_tablet_id, partition_id, version, |
| version * 10); |
| post_alter_rowsets_old.push_back(rowset); |
| |
| auto rowset_key = meta_rowset_key({instance_id, old_tablet_id, version}); |
| auto rowset_val = rowset.SerializeAsString(); |
| txn->put(rowset_key, rowset_val); |
| auto versioned_rowset_key = versioned::meta_rowset_load_key( |
| {instance_id, old_tablet_id, rowset.end_version()}); |
| versioned::document_put(txn.get(), versioned_rowset_key, std::move(rowset)); |
| } |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Add versions 6-7 to new tablet |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| for (int version = 6; version <= 7; ++version) { |
| auto rowset = create_rowset(500 + version, new_tablet_id, partition_id, version, |
| version * 10); |
| post_alter_rowsets_new.push_back(rowset); |
| |
| auto rowset_key = meta_rowset_key({instance_id, new_tablet_id, version}); |
| auto rowset_val = rowset.SerializeAsString(); |
| txn->put(rowset_key, rowset_val); |
| auto versioned_rowset_key = versioned::meta_rowset_load_key( |
| {instance_id, new_tablet_id, rowset.end_version()}); |
| versioned::document_put(txn.get(), versioned_rowset_key, std::move(rowset)); |
| } |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Step 5: Complete schema change - convert old tablet data via tmp rowsets |
| // Create tmp rowsets representing old tablet's [2-2, 3-3, 4-4, 5-5] data that will be converted to new tablet |
| std::vector<int64_t> tmp_txn_ids; |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| // Create tmp rowsets for versions 2-2, 3-3, 4-4, 5-5 (simulating old tablet data conversion) |
| for (int version = 2; version <= 5; ++version) { |
| int64_t tmp_txn_id = 40000 + version; |
| tmp_txn_ids.push_back(tmp_txn_id); |
| |
| // Create tmp rowset with data from old tablet |
| auto tmp_rowset = create_rowset(600 + version, new_tablet_id, partition_id, version, |
| version * 10); |
| tmp_rowset.set_txn_id(tmp_txn_id); |
| |
| auto tmp_rowset_key = meta_rowset_tmp_key({instance_id, tmp_txn_id, new_tablet_id}); |
| auto tmp_rowset_val = tmp_rowset.SerializeAsString(); |
| txn->put(tmp_rowset_key, tmp_rowset_val); |
| } |
| |
| // Create initial tablet stats for both tablets |
| TabletStatsPB old_tablet_stats; |
| old_tablet_stats.set_num_rows(270); // sum of versions 2-7: 20+30+40+50+60+70 |
| old_tablet_stats.set_data_size(270 * 100); |
| old_tablet_stats.set_num_rowsets(6); |
| old_tablet_stats.set_num_segments(6); |
| old_tablet_stats.set_index_size(300); |
| old_tablet_stats.set_segment_size(600); |
| old_tablet_stats.set_cumulative_point(3); |
| |
| auto old_stats_key = |
| stats_tablet_key({instance_id, table_id, index_id, partition_id, old_tablet_id}); |
| txn->put(old_stats_key, old_tablet_stats.SerializeAsString()); |
| |
| // Create tablet load stats for old tablet (versions 2-2, 3-3, 4-4, 5-5, 6-6, 7-7: all data) |
| TabletStatsPB old_tablet_load_stats; |
| old_tablet_load_stats.set_num_rows(270); // all versions 2-7: 20+30+40+50+60+70 |
| old_tablet_load_stats.set_data_size(270 * 100); |
| old_tablet_load_stats.set_num_rowsets(6); |
| old_tablet_load_stats.set_num_segments(6); |
| old_tablet_load_stats.set_index_size(300); |
| old_tablet_load_stats.set_segment_size(600); |
| old_tablet_load_stats.set_cumulative_point(3); |
| |
| auto old_tablet_load_stats_key = |
| versioned::tablet_load_stats_key({instance_id, old_tablet_id}); |
| versioned_put(txn.get(), old_tablet_load_stats_key, |
| old_tablet_load_stats.SerializeAsString()); |
| |
| TabletStatsPB new_tablet_stats; |
| new_tablet_stats.set_num_rows(220); // versions 4-7: 40+50+60+70 |
| new_tablet_stats.set_data_size(220 * 100); |
| new_tablet_stats.set_num_rowsets(4); |
| new_tablet_stats.set_num_segments(4); |
| new_tablet_stats.set_index_size(200); |
| new_tablet_stats.set_segment_size(400); |
| new_tablet_stats.set_cumulative_point(3); |
| |
| auto new_stats_key = |
| stats_tablet_key({instance_id, table_id, index_id, partition_id, new_tablet_id}); |
| txn->put(new_stats_key, new_tablet_stats.SerializeAsString()); |
| |
| // Create initial tablet load stats for new tablet (versions 4-4, 5-5, 6-6, 7-7) |
| TabletStatsPB new_tablet_load_stats; |
| new_tablet_load_stats.set_num_rows(220); // versions 4-7: 40+50+60+70 |
| new_tablet_load_stats.set_data_size(220 * 100); |
| new_tablet_load_stats.set_num_rowsets(4); |
| new_tablet_load_stats.set_num_segments(4); |
| new_tablet_load_stats.set_index_size(200); |
| new_tablet_load_stats.set_segment_size(400); |
| new_tablet_load_stats.set_cumulative_point(6); |
| |
| auto new_tablet_load_stats_key = |
| versioned::tablet_load_stats_key({instance_id, new_tablet_id}); |
| versioned_put(txn.get(), new_tablet_load_stats_key, |
| new_tablet_load_stats.SerializeAsString()); |
| |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| size_t num_logs_before = count_range(txn_kv.get(), versioned::log_key(instance_id), |
| versioned::log_key(instance_id) + "\xFF"); |
| |
| // Call finish_tablet_job to complete schema change (triggers process_schema_change_job) |
| { |
| brpc::Controller cntl; |
| FinishTabletJobRequest req; |
| FinishTabletJobResponse res; |
| |
| req.set_action(FinishTabletJobRequest::COMMIT); |
| req.mutable_job()->mutable_idx()->set_table_id(table_id); |
| req.mutable_job()->mutable_idx()->set_index_id(index_id); |
| req.mutable_job()->mutable_idx()->set_partition_id(partition_id); |
| req.mutable_job()->mutable_idx()->set_tablet_id(old_tablet_id); |
| |
| auto* schema_change = req.mutable_job()->mutable_schema_change(); |
| schema_change->set_id(job_id); |
| schema_change->set_initiator(initiator); |
| schema_change->mutable_new_tablet_idx()->set_table_id(table_id); |
| schema_change->mutable_new_tablet_idx()->set_index_id(index_id); |
| schema_change->mutable_new_tablet_idx()->set_partition_id(partition_id); |
| schema_change->mutable_new_tablet_idx()->set_tablet_id(new_tablet_id); |
| schema_change->set_alter_version(alter_version); |
| |
| // Specify tmp txn ids containing converted data from old tablet [2-2, 3-3, 4-4, 5-5] |
| for (int64_t tmp_txn_id : tmp_txn_ids) { |
| schema_change->add_txn_ids(tmp_txn_id); |
| } |
| |
| // Add output versions corresponding to tmp rowsets [2-2, 3-3, 4-4, 5-5] |
| for (int version = 2; version <= 5; ++version) { |
| schema_change->add_output_versions(version); |
| } |
| |
| // Add schema change statistics |
| // Output rowsets [2-2, 3-3, 4-4, 5-5]: 20+30+40+50 = 140 |
| schema_change->set_num_output_rows(140); |
| schema_change->set_num_output_rowsets(4); |
| schema_change->set_num_output_segments(4); |
| schema_change->set_size_output_rowsets(140 * 100); |
| schema_change->set_index_size_output_rowsets(200); |
| schema_change->set_segment_size_output_rowsets(400); |
| schema_change->set_output_cumulative_point(5); |
| |
| meta_service->finish_tablet_job(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| // Verification Step: Check schema change operation log was generated |
| size_t num_logs_after = count_range(txn_kv.get(), versioned::log_key(instance_id), |
| versioned::log_key(instance_id) + "\xFF"); |
| ASSERT_GT(num_logs_after, num_logs_before) |
| << "Expected new schema change operation log, but found no new logs"; |
| |
| // Verify recycle rowset metas in schema change log have key bounds stripped |
| { |
| Versionstamp log_version; |
| OperationLogPB operation_log; |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string log_key = versioned::log_key({instance_id}); |
| ASSERT_EQ(read_operation_log(txn.get(), log_key, &log_version, &operation_log), |
| TxnErrorCode::TXN_OK); |
| ASSERT_TRUE(operation_log.has_schema_change()); |
| const auto& schema_change_log = operation_log.schema_change(); |
| ASSERT_GT(schema_change_log.recycle_rowsets_size(), 0); |
| for (const auto& recycle_rs : schema_change_log.recycle_rowsets()) { |
| ASSERT_EQ(recycle_rs.rowset_meta().segments_key_bounds_size(), 0); |
| ASSERT_FALSE(recycle_rs.rowset_meta().has_segments_key_bounds_truncated()); |
| } |
| } |
| |
| // Verify meta_rowset_compact_keys exist before recycling |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| // Check that meta_rowset_compact_keys exist for all converted rowsets [2-2, 3-3, 4-4, 5-5] |
| for (int version = 2; version <= 5; ++version) { |
| auto meta_rowset_compact_key = |
| versioned::meta_rowset_compact_key({instance_id, new_tablet_id, version}); |
| doris::RowsetMetaCloudPB compact_rowset_meta; |
| Versionstamp versionstamp; |
| ASSERT_EQ(versioned::document_get(txn.get(), meta_rowset_compact_key, |
| &compact_rowset_meta, &versionstamp), |
| TxnErrorCode::TXN_OK) |
| << "Meta rowset compact key should exist for version " << version |
| << " before recycling"; |
| } |
| } |
| |
| // Set up recycler using the same txn_kv as meta_service |
| InstanceInfoPB instance_info; |
| instance_info.set_instance_id(instance_id); |
| instance_info.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_ENABLED); |
| InstanceRecycler recycler(txn_kv, instance_info, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| |
| // Now recycle the schema change operation log |
| ASSERT_EQ(recycler.recycle_operation_logs(), 0); |
| |
| // Verify schema change recycling results |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| // Check that meta_rowset_keys exist for all versions 2-7 (schema change writes 2-7 to new tablet) |
| for (int version = 2; version <= 7; ++version) { |
| auto meta_key = meta_rowset_key({instance_id, new_tablet_id, version}); |
| std::string rowset_value; |
| ASSERT_EQ(txn->get(meta_key, &rowset_value), TxnErrorCode::TXN_OK) |
| << "Meta rowset key should exist for version " << version |
| << " after schema change"; |
| } |
| |
| // Check that recycle_rowset_keys exist only for deleted versions 4-5 (schema change deletes new tablet's original 4-5 rowsets) |
| for (int version = 4; version <= 5; ++version) { |
| int idx = version - 4; |
| std::string rowset_id_v2 = alter_rowsets_new[idx].rowset_id_v2(); |
| std::string recycle_key = |
| recycle_rowset_key({instance_id, new_tablet_id, rowset_id_v2}); |
| std::string recycle_value; |
| ASSERT_EQ(txn->get(recycle_key, &recycle_value), TxnErrorCode::TXN_OK) |
| << "Recycle rowset key should exist for deleted version " << version |
| << " with rowset_id_v2 " << rowset_id_v2; |
| } |
| |
| // Check that recycle_rowset_keys do NOT exist for versions 2-3 (these are newly created from tmp conversion, not deleted) |
| // Note: We cannot easily check this as we don't know the exact rowset_id_v2 for newly created rowsets |
| |
| // Check that recycle_rowset_keys do NOT exist for versions 6-7 (these were not deleted by schema change) |
| for (int version = 6; version <= 7; ++version) { |
| int idx = version - 6; |
| std::string rowset_id_v2 = post_alter_rowsets_new[idx].rowset_id_v2(); |
| std::string recycle_key = |
| recycle_rowset_key({instance_id, new_tablet_id, rowset_id_v2}); |
| std::string recycle_value; |
| ASSERT_EQ(txn->get(recycle_key, &recycle_value), TxnErrorCode::TXN_KEY_NOT_FOUND) |
| << "Recycle rowset key should NOT exist for non-deleted version " << version |
| << " with rowset_id_v2 " << rowset_id_v2; |
| } |
| |
| // Verify that old tablet's meta_rowset_compact_keys are deleted after recycling operation logs |
| for (int version = 2; version <= 5; ++version) { |
| auto old_meta_rowset_compact_key = |
| versioned::meta_rowset_compact_key({instance_id, old_tablet_id, version}); |
| doris::RowsetMetaCloudPB compact_rowset_meta; |
| Versionstamp versionstamp; |
| ASSERT_EQ(versioned::document_get(txn.get(), old_meta_rowset_compact_key, |
| &compact_rowset_meta, &versionstamp), |
| TxnErrorCode::TXN_KEY_NOT_FOUND) |
| << "Old tablet meta rowset compact key should be deleted for version " |
| << version << " after recycling operation logs"; |
| } |
| |
| // Verify that new tablet's meta_rowset_compact_keys still exist after recycling operation logs |
| for (int version = 2; version <= 5; ++version) { |
| auto new_meta_rowset_compact_key = |
| versioned::meta_rowset_compact_key({instance_id, new_tablet_id, version}); |
| doris::RowsetMetaCloudPB compact_rowset_meta; |
| Versionstamp versionstamp; |
| ASSERT_EQ(versioned::document_get(txn.get(), new_meta_rowset_compact_key, |
| &compact_rowset_meta, &versionstamp), |
| TxnErrorCode::TXN_OK) |
| << "New tablet meta rowset compact key should still exist for version " |
| << version << " after recycling operation logs"; |
| } |
| } |
| |
| // Verify tablet load stats and meta tablet keys exist before recycling tablets |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| // Check tablet load stats keys exist |
| auto old_tablet_load_stats_key = |
| versioned::tablet_load_stats_key({instance_id, old_tablet_id}); |
| auto new_tablet_load_stats_key = |
| versioned::tablet_load_stats_key({instance_id, new_tablet_id}); |
| std::string load_stats_value; |
| Versionstamp* versionstamp = nullptr; |
| |
| ASSERT_EQ(versioned_get(txn.get(), old_tablet_load_stats_key, versionstamp, |
| &load_stats_value), |
| TxnErrorCode::TXN_OK) |
| << "Old tablet load stats should exist before recycling tablets"; |
| ASSERT_EQ(versioned_get(txn.get(), new_tablet_load_stats_key, versionstamp, |
| &load_stats_value), |
| TxnErrorCode::TXN_OK) |
| << "New tablet load stats should exist before recycling tablets"; |
| } |
| |
| // Verify that meta_tablet_keys exist before recycling tablets |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| auto old_tablet_key = |
| meta_tablet_key({instance_id, table_id, index_id, partition_id, old_tablet_id}); |
| auto new_tablet_key = |
| meta_tablet_key({instance_id, table_id, index_id, partition_id, new_tablet_id}); |
| std::string tablet_meta_value; |
| |
| LOG(INFO) << "Checking old_tablet_key: " << hex(old_tablet_key); |
| TxnErrorCode old_err = txn->get(old_tablet_key, &tablet_meta_value); |
| LOG(INFO) << "Old tablet key get result: " << old_err; |
| |
| LOG(INFO) << "Checking new_tablet_key: " << hex(new_tablet_key); |
| TxnErrorCode new_err = txn->get(new_tablet_key, &tablet_meta_value); |
| LOG(INFO) << "New tablet key get result: " << new_err; |
| |
| ASSERT_EQ(old_err, TxnErrorCode::TXN_OK) |
| << "Old tablet meta key should exist before recycling"; |
| ASSERT_EQ(new_err, TxnErrorCode::TXN_OK) |
| << "New tablet meta key should exist before recycling"; |
| } |
| |
| // Finally, test tablet recycling to verify tablet load stats deletion |
| RecyclerMetricsContext ctx(instance_id, "test"); |
| int recycled = recycler.recycle_tablets(table_id, index_id, ctx); |
| LOG(INFO) << "recycle_tablets returned: " << recycled; |
| ASSERT_EQ(recycled, 0); |
| |
| // Verify tablet load stats are now deleted (recycling tablets should delete them) |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| // Check that tablet load stats keys are deleted |
| auto old_tablet_load_stats_key = |
| versioned::tablet_load_stats_key({instance_id, old_tablet_id}); |
| auto new_tablet_load_stats_key = |
| versioned::tablet_load_stats_key({instance_id, new_tablet_id}); |
| std::string load_stats_value; |
| Versionstamp* versionstamp = nullptr; |
| |
| ASSERT_EQ(versioned_get(txn.get(), old_tablet_load_stats_key, versionstamp, |
| &load_stats_value), |
| TxnErrorCode::TXN_KEY_NOT_FOUND) |
| << "Old tablet load stats should be deleted after recycling tablets"; |
| ASSERT_EQ(versioned_get(txn.get(), new_tablet_load_stats_key, versionstamp, |
| &load_stats_value), |
| TxnErrorCode::TXN_KEY_NOT_FOUND) |
| << "New tablet load stats should be deleted after recycling tablets"; |
| |
| // Check that versioned meta tablet keys are deleted |
| auto old_tablet_key = versioned::meta_tablet_key({instance_id, old_tablet_id}); |
| auto new_tablet_key = versioned::meta_tablet_key({instance_id, new_tablet_id}); |
| std::string tablet_meta_value; |
| |
| ASSERT_EQ(versioned_get(txn.get(), old_tablet_key, versionstamp, &tablet_meta_value), |
| TxnErrorCode::TXN_KEY_NOT_FOUND) |
| << "Old versioned meta tablet key should be deleted after recycling tablets"; |
| ASSERT_EQ(versioned_get(txn.get(), new_tablet_key, versionstamp, &tablet_meta_value), |
| TxnErrorCode::TXN_KEY_NOT_FOUND) |
| << "New versioned meta tablet key should be deleted after recycling tablets"; |
| |
| // Verify that new tablet's meta_rowset_compact_keys still exist after recycling tablets |
| // (they are not deleted during tablet recycling) |
| for (int version = 2; version <= 5; ++version) { |
| auto new_meta_rowset_compact_key = |
| versioned::meta_rowset_compact_key({instance_id, new_tablet_id, version}); |
| doris::RowsetMetaCloudPB compact_rowset_meta; |
| Versionstamp versionstamp; |
| ASSERT_EQ(versioned::document_get(txn.get(), new_meta_rowset_compact_key, |
| &compact_rowset_meta, &versionstamp), |
| TxnErrorCode::TXN_OK) |
| << "New tablet meta rowset compact key should still exist for version " |
| << version << " after recycling tablets"; |
| } |
| } |
| } |
| |
| TEST(RecycleOperationLogTest, RecycleSchemaChangeLogKeepKeyBoundsWhenDisabled) { |
| auto old_flag = config::enable_recycle_rowset_strip_key_bounds; |
| config::enable_recycle_rowset_strip_key_bounds = false; |
| DORIS_CLOUD_DEFER { |
| config::enable_recycle_rowset_strip_key_bounds = old_flag; |
| }; |
| |
| auto meta_service = get_meta_service(false); |
| std::string instance_id = "recycle_schema_change_keep_bounds"; |
| auto* sp = SyncPoint::get_instance(); |
| DORIS_CLOUD_DEFER { |
| SyncPoint::get_instance()->clear_all_call_backs(); |
| }; |
| sp->set_call_back("get_instance_id", [&](auto&& args) { |
| auto* ret = try_any_cast_ret<std::string>(args); |
| ret->first = instance_id; |
| ret->second = true; |
| }); |
| sp->set_call_back("check_lazy_txn_finished::bypass_check", [&](auto&& args) { |
| auto* ret = doris::try_any_cast_ret<bool>(args); |
| ret->first = true; |
| ret->second = true; |
| }); |
| sp->set_call_back("delete_rowset_data::bypass_check", [&](auto&& args) { |
| auto* ret = doris::try_any_cast_ret<bool>(args); |
| ret->first = true; |
| ret->second = true; |
| }); |
| sp->set_call_back("recycle_tablet::bypass_check", [&](auto&& args) { |
| auto* ret = doris::try_any_cast_ret<bool>(args); |
| ret->first = false; |
| ret->second = true; |
| }); |
| sp->enable_processing(); |
| |
| constexpr int64_t table_id = 52001; |
| constexpr int64_t index_id = 52002; |
| constexpr int64_t partition_id = 52003; |
| constexpr int64_t old_tablet_id = 52004; |
| constexpr int64_t new_tablet_id = 52005; |
| constexpr int64_t alter_version = 5; |
| const std::string job_id = "recycle_schema_change_keep_bounds_job"; |
| const std::string initiator = "test_be"; |
| |
| { |
| InstanceInfoPB instance_info; |
| instance_info.set_instance_id(instance_id); |
| instance_info.set_multi_version_status(MULTI_VERSION_ENABLED); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->put(instance_key(instance_id), instance_info.SerializeAsString()); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| meta_service->resource_mgr()->refresh_instance(instance_id); |
| ASSERT_TRUE(meta_service->resource_mgr()->is_version_write_enabled(instance_id)); |
| } |
| |
| // Create old tablet |
| create_tablet(meta_service.get(), table_id, index_id, partition_id, old_tablet_id); |
| auto txn_kv = meta_service->txn_kv(); |
| |
| // Add initial rowsets 2-3 to old tablet |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int version = 2; version <= 3; ++version) { |
| auto rowset = create_rowset(1000 + version, old_tablet_id, partition_id, version, |
| version * 10); |
| auto rowset_key = meta_rowset_key({instance_id, old_tablet_id, version}); |
| txn->put(rowset_key, rowset.SerializeAsString()); |
| auto versioned_rowset_key = versioned::meta_rowset_load_key( |
| {instance_id, old_tablet_id, rowset.end_version()}); |
| versioned::document_put(txn.get(), versioned_rowset_key, std::move(rowset)); |
| } |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Create new tablet in NOTREADY state |
| { |
| brpc::Controller cntl; |
| CreateTabletsRequest req; |
| CreateTabletsResponse res; |
| req.set_db_id(1); |
| req.set_cloud_unique_id("test_cloud_unique_id_keep_bounds"); |
| add_tablet(req, table_id, index_id, partition_id, new_tablet_id); |
| req.mutable_tablet_metas(0)->set_tablet_state(doris::TabletStatePB::PB_NOTREADY); |
| meta_service->create_tablets(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| // Add alter versions 4-5 to both old and new tablets |
| std::vector<doris::RowsetMetaCloudPB> alter_rowsets_new; |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int version = 4; version <= 5; ++version) { |
| auto rowset_old = create_rowset(1100 + version, old_tablet_id, partition_id, version, |
| version * 10); |
| auto rowset_key_old = meta_rowset_key({instance_id, old_tablet_id, version}); |
| txn->put(rowset_key_old, rowset_old.SerializeAsString()); |
| auto versioned_rowset_key_old = versioned::meta_rowset_load_key( |
| {instance_id, old_tablet_id, rowset_old.end_version()}); |
| versioned::document_put(txn.get(), versioned_rowset_key_old, std::move(rowset_old)); |
| |
| auto rowset_new = create_rowset(1200 + version, new_tablet_id, partition_id, version, |
| version * 10); |
| alter_rowsets_new.push_back(rowset_new); |
| auto rowset_key_new = meta_rowset_key({instance_id, new_tablet_id, version}); |
| txn->put(rowset_key_new, rowset_new.SerializeAsString()); |
| auto versioned_rowset_key_new = versioned::meta_rowset_load_key( |
| {instance_id, new_tablet_id, rowset_new.end_version()}); |
| versioned::document_put(txn.get(), versioned_rowset_key_new, std::move(rowset_new)); |
| } |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Start schema change job |
| { |
| brpc::Controller cntl; |
| StartTabletJobRequest req; |
| StartTabletJobResponse res; |
| req.mutable_job()->mutable_idx()->set_tablet_id(old_tablet_id); |
| auto* schema_change = req.mutable_job()->mutable_schema_change(); |
| schema_change->set_id(job_id); |
| schema_change->set_initiator(initiator); |
| schema_change->mutable_new_tablet_idx()->set_table_id(table_id); |
| schema_change->mutable_new_tablet_idx()->set_index_id(index_id); |
| schema_change->mutable_new_tablet_idx()->set_partition_id(partition_id); |
| schema_change->mutable_new_tablet_idx()->set_tablet_id(new_tablet_id); |
| schema_change->set_alter_version(alter_version); |
| long now = time(nullptr); |
| schema_change->set_expiration(now + 12); |
| meta_service->start_tablet_job(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| // Create tmp rowsets for versions 2-5 (converted data) |
| std::vector<int64_t> tmp_txn_ids; |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int version = 2; version <= 5; ++version) { |
| int64_t tmp_txn_id = 520000 + version; |
| tmp_txn_ids.push_back(tmp_txn_id); |
| auto tmp_rowset = create_rowset(1300 + version, new_tablet_id, partition_id, version, |
| version * 10); |
| tmp_rowset.set_txn_id(tmp_txn_id); |
| auto tmp_rowset_key = meta_rowset_tmp_key({instance_id, tmp_txn_id, new_tablet_id}); |
| txn->put(tmp_rowset_key, tmp_rowset.SerializeAsString()); |
| } |
| |
| // Seed tablet stats for new tablet to satisfy stats fetch |
| TabletStatsPB new_tablet_stats; |
| new_tablet_stats.set_num_rows(90); // versions 4-5: 40+50 |
| new_tablet_stats.set_data_size(90 * 100); |
| new_tablet_stats.set_num_rowsets(2); |
| new_tablet_stats.set_num_segments(2); |
| new_tablet_stats.set_index_size(100); |
| new_tablet_stats.set_segment_size(200); |
| new_tablet_stats.set_cumulative_point(5); |
| |
| auto new_stats_key = |
| stats_tablet_key({instance_id, table_id, index_id, partition_id, new_tablet_id}); |
| txn->put(new_stats_key, new_tablet_stats.SerializeAsString()); |
| |
| auto new_tablet_load_stats_key = |
| versioned::tablet_load_stats_key({instance_id, new_tablet_id}); |
| versioned_put(txn.get(), new_tablet_load_stats_key, new_tablet_stats.SerializeAsString()); |
| |
| auto new_tablet_compact_stats_key = |
| versioned::tablet_compact_stats_key({instance_id, new_tablet_id}); |
| versioned_put(txn.get(), new_tablet_compact_stats_key, |
| new_tablet_stats.SerializeAsString()); |
| |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Finish schema change job |
| { |
| brpc::Controller cntl; |
| FinishTabletJobRequest req; |
| FinishTabletJobResponse res; |
| req.set_action(FinishTabletJobRequest::COMMIT); |
| req.mutable_job()->mutable_idx()->set_table_id(table_id); |
| req.mutable_job()->mutable_idx()->set_index_id(index_id); |
| req.mutable_job()->mutable_idx()->set_partition_id(partition_id); |
| req.mutable_job()->mutable_idx()->set_tablet_id(old_tablet_id); |
| |
| auto* schema_change = req.mutable_job()->mutable_schema_change(); |
| schema_change->set_id(job_id); |
| schema_change->set_initiator(initiator); |
| schema_change->mutable_new_tablet_idx()->set_table_id(table_id); |
| schema_change->mutable_new_tablet_idx()->set_index_id(index_id); |
| schema_change->mutable_new_tablet_idx()->set_partition_id(partition_id); |
| schema_change->mutable_new_tablet_idx()->set_tablet_id(new_tablet_id); |
| schema_change->set_alter_version(alter_version); |
| for (int64_t tmp_txn_id : tmp_txn_ids) { |
| schema_change->add_txn_ids(tmp_txn_id); |
| } |
| for (int version = 2; version <= 5; ++version) { |
| schema_change->add_output_versions(version); |
| } |
| schema_change->set_num_output_rows(140); // 20+30+40+50 |
| schema_change->set_num_output_rowsets(4); |
| schema_change->set_num_output_segments(4); |
| schema_change->set_size_output_rowsets(140 * 100); |
| schema_change->set_index_size_output_rowsets(200); |
| schema_change->set_segment_size_output_rowsets(400); |
| schema_change->set_output_cumulative_point(5); |
| |
| meta_service->finish_tablet_job(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| // Verify schema change operation log keeps key bounds when stripping is disabled |
| { |
| Versionstamp log_version; |
| OperationLogPB operation_log; |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string log_key = versioned::log_key({instance_id}); |
| ASSERT_EQ(read_operation_log(txn.get(), log_key, &log_version, &operation_log), |
| TxnErrorCode::TXN_OK); |
| ASSERT_TRUE(operation_log.has_schema_change()); |
| const auto& schema_change_log = operation_log.schema_change(); |
| ASSERT_EQ(schema_change_log.recycle_rowsets_size(), alter_rowsets_new.size()); |
| for (const auto& recycle_rs : schema_change_log.recycle_rowsets()) { |
| ASSERT_GT(recycle_rs.rowset_meta().segments_key_bounds_size(), 0); |
| ASSERT_TRUE(recycle_rs.rowset_meta().has_segments_key_bounds_truncated()); |
| ASSERT_FALSE(recycle_rs.rowset_meta().segments_key_bounds_truncated()); |
| } |
| } |
| } |
| |
| TEST(RecycleOperationLogTest, RecycleDeletedInstance) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_WRITE_ONLY); |
| update_instance_info(txn_kv.get(), instance); |
| |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| |
| uint64_t tablet_id = 1, partition_id = 2, index_id = 3, table_id = 4, db_id = 5; |
| { |
| // Create tablet meta |
| std::string tablet_meta_key = versioned::meta_tablet_key({instance_id, tablet_id}); |
| doris::TabletMetaCloudPB tablet_meta; |
| tablet_meta.set_tablet_id(tablet_id); |
| tablet_meta.set_creation_time(::time(nullptr)); |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| versioned_put(txn.get(), tablet_meta_key, tablet_meta.SerializeAsString()); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| // Put partition version |
| std::string partition_version_key = |
| versioned::partition_version_key({instance_id, partition_id}); |
| VersionPB version; |
| version.set_version(12345); |
| version.set_update_time_ms(0); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| versioned_put(txn.get(), partition_version_key, version.SerializeAsString()); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| // Put partition index |
| std::string partition_index_key = |
| versioned::partition_index_key({instance_id, partition_id}); |
| PartitionIndexPB partition_index; |
| partition_index.set_table_id(table_id); |
| partition_index.set_db_id(db_id); |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->put(partition_index_key, partition_index.SerializeAsString()); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| // Put rowset meta, tablet stats and data ref count |
| // Create rowset meta |
| std::string rowset_key = meta_rowset_key({instance_id, tablet_id, 2}); |
| doris::RowsetMetaCloudPB rowset_meta; |
| rowset_meta.set_rowset_id(12345); |
| rowset_meta.set_rowset_id_v2("test_rowset_id"); |
| rowset_meta.set_tablet_id(tablet_id); |
| rowset_meta.set_partition_id(partition_id); |
| rowset_meta.set_start_version(2); |
| rowset_meta.set_end_version(2); |
| rowset_meta.set_num_rows(100); |
| rowset_meta.set_data_disk_size(10000); |
| rowset_meta.set_index_disk_size(1000); |
| rowset_meta.set_total_disk_size(11000); |
| |
| // Create tablet stats |
| std::string tablet_stats_key = |
| stats_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}); |
| TabletStatsPB tablet_stats; |
| tablet_stats.set_num_rows(100); |
| tablet_stats.set_data_size(10000); |
| tablet_stats.set_num_rowsets(1); |
| tablet_stats.set_num_segments(1); |
| tablet_stats.set_index_size(1000); |
| tablet_stats.set_segment_size(11000); |
| tablet_stats.set_cumulative_point(1); |
| |
| // Create data ref count |
| std::string data_ref_count_key = |
| versioned::data_rowset_ref_count_key({instance_id, tablet_id, "test_rowset_id"}); |
| std::string ref_count = "1"; |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| versioned::document_put(txn.get(), rowset_key, std::move(rowset_meta)); |
| versioned_put(txn.get(), tablet_stats_key, tablet_stats.SerializeAsString()); |
| versioned_put(txn.get(), data_ref_count_key, ref_count); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| // Put operation logs |
| std::string log_key = versioned::log_key(instance_id); |
| Versionstamp versionstamp(123, 0); |
| OperationLogPB operation_log; |
| operation_log.set_min_timestamp(versionstamp.version()); |
| |
| // Create a commit txn log |
| auto* commit_txn = operation_log.mutable_commit_txn(); |
| commit_txn->set_db_id(db_id); |
| commit_txn->set_txn_id(100); |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| versioned::blob_put(txn.get(), log_key, operation_log); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| // Mark instance deleted. |
| InstanceInfoPB deleted_instance = instance; |
| deleted_instance.set_status(InstanceInfoPB::DELETED); |
| update_instance_info(txn_kv.get(), deleted_instance); |
| } |
| |
| ASSERT_EQ(recycler.recycle_deleted_instance(), 0); |
| |
| // Verify all keys are deleted, expecting the instance_update |
| ASSERT_EQ(count_range(txn_kv.get()), 1) << dump_range(txn_kv.get()); |
| } |
| |
| // Test OperationLogRecycleChecker class |
| TEST(OperationLogRecycleCheckerTest, InitAndBasicCheck) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| txn_kv->update_commit_version(1000); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| std::string test_instance_id = "test_operation_log_recycle_checker"; |
| auto get_current_versionstamp = [&]() -> Versionstamp { |
| std::unique_ptr<Transaction> txn; |
| EXPECT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| int64_t read_version; |
| EXPECT_EQ(txn->get_read_version(&read_version), TxnErrorCode::TXN_OK); |
| return Versionstamp(read_version, 0); |
| }; |
| |
| auto insert_empty_value = [&]() { |
| std::unique_ptr<Transaction> txn; |
| EXPECT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->put("dummy", ""); |
| EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| }; |
| |
| insert_empty_value(); |
| |
| Versionstamp old_version = get_current_versionstamp(); |
| |
| insert_empty_value(); |
| |
| // Test initialization without snapshots |
| OperationLogReferenceInfo reference_info; |
| { |
| InstanceInfoPB instance_info; |
| OperationLogRecycleChecker checker(test_instance_id, txn_kv.get(), instance_info); |
| ASSERT_EQ(checker.init(), 0); |
| |
| // All logs should be recyclable when no snapshots exist |
| ASSERT_TRUE(checker.can_recycle(old_version, 1, &reference_info)) << old_version.version(); |
| } |
| |
| { |
| // Even a log has no min_timestamp, it can be recycled. |
| InstanceInfoPB instance_info; |
| OperationLogRecycleChecker checker(test_instance_id, txn_kv.get(), instance_info); |
| ASSERT_EQ(checker.init(), 0); |
| |
| OperationLogPB op_log; |
| ASSERT_TRUE(checker.can_recycle(old_version, op_log.min_timestamp(), &reference_info)); |
| } |
| |
| auto write_snapshot = [&]() { |
| // Write snapshot |
| SnapshotPB snapshot; |
| std::string snapshot_key = versioned::snapshot_full_key(test_instance_id); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| versioned_put(txn.get(), snapshot_key, snapshot.SerializeAsString()); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| insert_empty_value(); |
| }; |
| |
| write_snapshot(); |
| Versionstamp version1 = get_current_versionstamp(); |
| |
| write_snapshot(); |
| Versionstamp version2 = get_current_versionstamp(); |
| |
| insert_empty_value(); |
| |
| { |
| InstanceInfoPB instance_info; |
| OperationLogRecycleChecker checker(test_instance_id, txn_kv.get(), instance_info); |
| ASSERT_EQ(checker.init(), 0); |
| |
| // case 1, old operation log can be recycled. |
| ASSERT_TRUE(checker.can_recycle(old_version, 1, &reference_info)); |
| // case 2. snapshot exist in the log range, can not be recycled. |
| ASSERT_FALSE(checker.can_recycle(version1, old_version.version(), &reference_info)) |
| << "version1: " << version1.version() << ", old_version: " << old_version.version(); |
| |
| Versionstamp version3 = get_current_versionstamp(); |
| Versionstamp version4(version3.version(), 1); |
| |
| // case 3. large operation log can not be recycled. |
| ASSERT_FALSE(checker.can_recycle(version4, version2.version(), &reference_info)); |
| |
| // case 4: [min_version, operation log version) |
| ASSERT_TRUE(checker.can_recycle(version1, version1.version(), &reference_info)); |
| } |
| |
| { |
| Versionstamp snapshot_versionstamp = get_current_versionstamp(); |
| InstanceInfoPB instance_info; |
| instance_info.set_source_snapshot_id( |
| SnapshotManager::serialize_snapshot_id(snapshot_versionstamp)); |
| OperationLogRecycleChecker checker(test_instance_id, txn_kv.get(), instance_info); |
| ASSERT_EQ(checker.init(), 0); |
| |
| Versionstamp version5 = get_current_versionstamp(); |
| |
| ASSERT_FALSE(checker.can_recycle(version5, version2.version(), &reference_info)) |
| << "version5: " << version5.version() << ", version2: " << version2.version(); |
| } |
| } |
| |
| TEST(RecycleOperationLogTest, RecycleCompactionLogKeepKeyBoundsWhenDisabled) { |
| auto old_flag = config::enable_recycle_rowset_strip_key_bounds; |
| config::enable_recycle_rowset_strip_key_bounds = false; |
| DORIS_CLOUD_DEFER { |
| config::enable_recycle_rowset_strip_key_bounds = old_flag; |
| }; |
| |
| auto meta_service = get_meta_service(false); |
| std::string test_instance_id = "recycle_compaction_log_keep_bounds"; |
| auto* sp = SyncPoint::get_instance(); |
| DORIS_CLOUD_DEFER { |
| SyncPoint::get_instance()->clear_all_call_backs(); |
| }; |
| sp->set_call_back("get_instance_id", [&](auto&& args) { |
| auto* ret = try_any_cast_ret<std::string>(args); |
| ret->first = test_instance_id; |
| ret->second = true; |
| }); |
| sp->set_call_back("check_lazy_txn_finished::bypass_check", [&](auto&& args) { |
| auto* ret = doris::try_any_cast_ret<bool>(args); |
| ret->first = true; |
| ret->second = true; |
| }); |
| sp->set_call_back("recycle_tablet::bypass_check", [&](auto&& args) { |
| auto* ret = doris::try_any_cast_ret<bool>(args); |
| ret->first = false; |
| ret->second = true; |
| }); |
| sp->enable_processing(); |
| |
| constexpr int64_t table_id = 21001; |
| constexpr int64_t index_id = 21002; |
| constexpr int64_t partition_id = 21003; |
| constexpr int64_t tablet_id = 21004; |
| |
| { |
| InstanceInfoPB instance_info; |
| instance_info.set_instance_id(test_instance_id); |
| instance_info.set_multi_version_status(MULTI_VERSION_ENABLED); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->put(instance_key(test_instance_id), instance_info.SerializeAsString()); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| meta_service->resource_mgr()->refresh_instance(test_instance_id); |
| ASSERT_TRUE(meta_service->resource_mgr()->is_version_write_enabled(test_instance_id)); |
| } |
| |
| create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id); |
| |
| std::vector<doris::RowsetMetaCloudPB> input_rowsets; |
| auto txn_kv = meta_service->txn_kv(); |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < 2; ++i) { |
| auto rowset = create_rowset(i + 500, tablet_id, partition_id, i + 2, 20 * (i + 1)); |
| input_rowsets.push_back(rowset); |
| auto rowset_key = meta_rowset_key({test_instance_id, tablet_id, rowset.end_version()}); |
| txn->put(rowset_key, rowset.SerializeAsString()); |
| auto versioned_rowset_key = versioned::meta_rowset_load_key( |
| {test_instance_id, tablet_id, rowset.end_version()}); |
| versioned::document_put(txn.get(), versioned_rowset_key, std::move(rowset)); |
| } |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| constexpr int64_t txn_id = 60001; |
| constexpr int64_t output_end_version = 3; |
| auto output_rowset = create_rowset(700, tablet_id, partition_id, 2, 40); |
| output_rowset.set_end_version(output_end_version); |
| output_rowset.set_txn_id(txn_id); |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| auto tmp_rowset_key = meta_rowset_tmp_key({test_instance_id, txn_id, tablet_id}); |
| txn->put(tmp_rowset_key, output_rowset.SerializeAsString()); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Seed tablet stats to avoid negative adjustments during compaction |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| TabletStatsPB initial_stats; |
| initial_stats.set_num_rows(60); // 20 + 40 |
| initial_stats.set_data_size(6000); // data_disk_size sum |
| initial_stats.set_num_rowsets(2); |
| initial_stats.set_num_segments(2); |
| initial_stats.set_index_size(600); // index_disk_size sum |
| initial_stats.set_segment_size(6000); // data size |
| initial_stats.set_cumulative_point(1); |
| |
| auto stats_key = |
| stats_tablet_key({test_instance_id, table_id, index_id, partition_id, tablet_id}); |
| txn->put(stats_key, initial_stats.SerializeAsString()); |
| |
| auto versioned_load_stats_key = |
| versioned::tablet_load_stats_key({test_instance_id, tablet_id}); |
| versioned_put(txn.get(), versioned_load_stats_key, initial_stats.SerializeAsString()); |
| |
| auto tablet_compact_stats_key = |
| versioned::tablet_compact_stats_key({test_instance_id, tablet_id}); |
| versioned_put(txn.get(), tablet_compact_stats_key, initial_stats.SerializeAsString()); |
| |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| brpc::Controller cntl; |
| StartTabletJobRequest req; |
| StartTabletJobResponse res; |
| req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id); |
| auto compaction = req.mutable_job()->add_compaction(); |
| compaction->set_id("job_keep_bounds"); |
| compaction->set_initiator("be"); |
| compaction->set_type(TabletCompactionJobPB::CUMULATIVE); |
| compaction->set_base_compaction_cnt(0); |
| compaction->set_cumulative_compaction_cnt(0); |
| compaction->add_input_versions(2); |
| compaction->add_input_versions(3); |
| long now = time(nullptr); |
| compaction->set_expiration(now + 12); |
| compaction->set_lease(now + 3); |
| meta_service->start_tablet_job(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| { |
| brpc::Controller cntl; |
| FinishTabletJobRequest req; |
| FinishTabletJobResponse res; |
| req.set_action(FinishTabletJobRequest::COMMIT); |
| req.mutable_job()->mutable_idx()->set_table_id(table_id); |
| req.mutable_job()->mutable_idx()->set_index_id(index_id); |
| req.mutable_job()->mutable_idx()->set_partition_id(partition_id); |
| req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id); |
| |
| auto compaction = req.mutable_job()->add_compaction(); |
| compaction->set_id("job_keep_bounds"); |
| compaction->set_initiator("be"); |
| compaction->set_type(TabletCompactionJobPB::CUMULATIVE); |
| compaction->set_base_compaction_cnt(0); |
| compaction->set_cumulative_compaction_cnt(0); |
| compaction->add_input_versions(2); |
| compaction->add_input_versions(3); |
| compaction->add_txn_id(txn_id); |
| compaction->add_output_versions(output_end_version); |
| compaction->add_output_rowset_ids(output_rowset.rowset_id_v2()); |
| compaction->set_output_cumulative_point(4); |
| compaction->set_size_input_rowsets(6600); // total_disk_size of inputs |
| compaction->set_index_size_input_rowsets(600); // index_disk_size of inputs |
| compaction->set_segment_size_input_rowsets(6000); // data_disk_size of inputs |
| compaction->set_num_input_rows(60); |
| compaction->set_num_input_rowsets(2); |
| compaction->set_num_input_segments(2); |
| compaction->set_size_output_rowsets(4400); // total_disk_size of output |
| compaction->set_index_size_output_rowsets(400); // index_disk_size of output |
| compaction->set_segment_size_output_rowsets(4000); // data_disk_size of output |
| compaction->set_num_output_rows(40); |
| compaction->set_num_output_rowsets(1); |
| compaction->set_num_output_segments(1); |
| meta_service->finish_tablet_job(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| Versionstamp log_version; |
| OperationLogPB operation_log; |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string log_key = versioned::log_key({test_instance_id}); |
| ASSERT_EQ(read_operation_log(txn.get(), log_key, &log_version, &operation_log), |
| TxnErrorCode::TXN_OK); |
| } |
| ASSERT_TRUE(operation_log.has_compaction()); |
| const auto& compaction_log = operation_log.compaction(); |
| ASSERT_EQ(compaction_log.recycle_rowsets_size(), 2); |
| for (const auto& recycle_rs : compaction_log.recycle_rowsets()) { |
| ASSERT_GT(recycle_rs.rowset_meta().segments_key_bounds_size(), 0); |
| ASSERT_TRUE(recycle_rs.rowset_meta().has_segments_key_bounds_truncated()); |
| ASSERT_FALSE(recycle_rs.rowset_meta().segments_key_bounds_truncated()); |
| } |
| } |
| |
| } // namespace doris::cloud |