| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "recycler/recycler.h" |
| |
| #include <gen_cpp/cloud.pb.h> |
| #include <gen_cpp/olap_file.pb.h> |
| #include <gtest/gtest.h> |
| |
| #include <chrono> |
| #include <memory> |
| #include <random> |
| #include <string> |
| #include <thread> |
| |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "common/simple_thread_pool.h" |
| #include "common/util.h" |
| #include "cpp/sync_point.h" |
| #include "meta-service/keys.h" |
| #include "meta-service/mem_txn_kv.h" |
| #include "meta-service/meta_service.h" |
| #include "meta-service/txn_kv_error.h" |
| #include "mock_accessor.h" |
| #include "mock_resource_manager.h" |
| #include "rate-limiter/rate_limiter.h" |
| #include "recycler/checker.h" |
| #include "recycler/storage_vault_accessor.h" |
| #include "recycler/util.h" |
| #include "recycler/white_black_list.h" |
| |
| using namespace doris; |
| |
| static const std::string instance_id = "instance_id_recycle_test"; |
| static int64_t current_time = 0; |
| static constexpr int64_t db_id = 1000; |
| |
| static doris::cloud::RecyclerThreadPoolGroup thread_group; |
| |
| int main(int argc, char** argv) { |
| auto conf_file = "doris_cloud.conf"; |
| if (!cloud::config::init(conf_file, true)) { |
| std::cerr << "failed to init config file, conf=" << conf_file << std::endl; |
| return -1; |
| } |
| if (!cloud::init_glog("recycler")) { |
| std::cerr << "failed to init glog" << std::endl; |
| return -1; |
| } |
| |
| using namespace std::chrono; |
| current_time = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); |
| config::recycler_sleep_before_scheduling_seconds = 0; // we dont have to wait in UT |
| |
| ::testing::InitGoogleTest(&argc, argv); |
| auto s3_producer_pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism); |
| s3_producer_pool->start(); |
| auto recycle_tablet_pool = std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism); |
| recycle_tablet_pool->start(); |
| auto group_recycle_function_pool = |
| std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism); |
| group_recycle_function_pool->start(); |
| thread_group = |
| RecyclerThreadPoolGroup(std::move(s3_producer_pool), std::move(recycle_tablet_pool), |
| std::move(group_recycle_function_pool)); |
| return RUN_ALL_TESTS(); |
| } |
| |
| namespace doris::cloud { |
| |
| TEST(RecyclerTest, WhiteBlackList) { |
| WhiteBlackList filter; |
| EXPECT_FALSE(filter.filter_out("instance1")); |
| EXPECT_FALSE(filter.filter_out("instance2")); |
| filter.reset({}, {"instance1", "instance2"}); |
| EXPECT_TRUE(filter.filter_out("instance1")); |
| EXPECT_TRUE(filter.filter_out("instance2")); |
| EXPECT_FALSE(filter.filter_out("instance3")); |
| filter.reset({"instance1", "instance2"}, {}); |
| EXPECT_FALSE(filter.filter_out("instance1")); |
| EXPECT_FALSE(filter.filter_out("instance2")); |
| EXPECT_TRUE(filter.filter_out("instance3")); |
| filter.reset({"instance1"}, {"instance1"}); // whitelist overrides blacklist |
| EXPECT_FALSE(filter.filter_out("instance1")); |
| EXPECT_TRUE(filter.filter_out("instance2")); |
| } |
| |
| static std::string next_rowset_id() { |
| static int64_t cnt = 0; |
| return std::to_string(++cnt); |
| } |
| |
| static doris::RowsetMetaCloudPB create_rowset(const std::string& resource_id, int64_t tablet_id, |
| int64_t index_id, int num_segments, |
| const doris::TabletSchemaCloudPB& schema, |
| int64_t txn_id = 0) { |
| doris::RowsetMetaCloudPB rowset; |
| rowset.set_rowset_id(0); // useless but required |
| rowset.set_rowset_id_v2(next_rowset_id()); |
| rowset.set_txn_id(txn_id); |
| rowset.set_num_segments(num_segments); |
| rowset.set_tablet_id(tablet_id); |
| rowset.set_index_id(index_id); |
| rowset.set_resource_id(resource_id); |
| rowset.set_schema_version(schema.schema_version()); |
| rowset.mutable_tablet_schema()->CopyFrom(schema); |
| return rowset; |
| } |
| |
| static doris::RowsetMetaCloudPB create_rowset(const std::string& resource_id, int64_t tablet_id, |
| int64_t index_id, int num_segments, |
| const doris::TabletSchemaCloudPB& schema, |
| RowsetStatePB rowset_state, int64_t txn_id = 0) { |
| doris::RowsetMetaCloudPB rowset; |
| rowset.set_rowset_id(0); // useless but required |
| rowset.set_rowset_id_v2(next_rowset_id()); |
| rowset.set_txn_id(txn_id); |
| rowset.set_num_segments(num_segments); |
| rowset.set_tablet_id(tablet_id); |
| rowset.set_index_id(index_id); |
| rowset.set_resource_id(resource_id); |
| rowset.set_schema_version(schema.schema_version()); |
| rowset.mutable_tablet_schema()->CopyFrom(schema); |
| rowset.set_rowset_state(rowset_state); |
| return rowset; |
| } |
| |
| static int create_recycle_rowset(TxnKv* txn_kv, StorageVaultAccessor* accessor, |
| const doris::RowsetMetaCloudPB& rowset, RecycleRowsetPB::Type type, |
| bool write_schema_kv) { |
| std::string key; |
| std::string val; |
| |
| RecycleRowsetKeyInfo key_info {instance_id, rowset.tablet_id(), rowset.rowset_id_v2()}; |
| recycle_rowset_key(key_info, &key); |
| |
| RecycleRowsetPB rowset_pb; |
| rowset_pb.set_creation_time(current_time); |
| if (type != RecycleRowsetPB::UNKNOWN) { |
| rowset_pb.set_type(type); |
| rowset_pb.mutable_rowset_meta()->CopyFrom(rowset); |
| if (write_schema_kv) { // Detach schema |
| rowset_pb.mutable_rowset_meta()->set_allocated_tablet_schema(nullptr); |
| } |
| } else { // old version RecycleRowsetPB |
| rowset_pb.set_tablet_id(rowset.tablet_id()); |
| rowset_pb.set_resource_id(rowset.resource_id()); |
| } |
| rowset_pb.SerializeToString(&val); |
| |
| std::unique_ptr<Transaction> txn; |
| if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| txn->put(key, val); |
| std::string schema_key, schema_val; |
| if (write_schema_kv) { |
| meta_schema_key({instance_id, rowset.index_id(), rowset.schema_version()}, &schema_key); |
| rowset.tablet_schema().SerializeToString(&schema_val); |
| txn->put(schema_key, schema_val); |
| } |
| if (txn->commit() != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| |
| for (int i = 0; i < rowset.num_segments(); ++i) { |
| auto path = segment_path(rowset.tablet_id(), rowset.rowset_id_v2(), i); |
| accessor->put_file(path, ""); |
| for (auto& index : rowset.tablet_schema().index()) { |
| auto path = inverted_index_path_v1(rowset.tablet_id(), rowset.rowset_id_v2(), i, |
| index.index_id(), index.index_suffix_name()); |
| accessor->put_file(path, ""); |
| } |
| } |
| return 0; |
| } |
| |
| static int create_tmp_rowset(TxnKv* txn_kv, StorageVaultAccessor* accessor, |
| const doris::RowsetMetaCloudPB& rowset, bool write_schema_kv, |
| bool is_inverted_idx_v2 = false) { |
| std::string key, val; |
| meta_rowset_tmp_key({instance_id, rowset.txn_id(), rowset.tablet_id()}, &key); |
| if (write_schema_kv) { |
| auto rowset_copy = rowset; |
| rowset_copy.clear_tablet_schema(); |
| rowset_copy.SerializeToString(&val); |
| } else { |
| rowset.SerializeToString(&val); |
| } |
| std::unique_ptr<Transaction> txn; |
| if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| txn->put(key, val); |
| std::string schema_key, schema_val; |
| if (write_schema_kv) { |
| meta_schema_key({instance_id, rowset.index_id(), rowset.schema_version()}, &schema_key); |
| rowset.tablet_schema().SerializeToString(&schema_val); |
| txn->put(schema_key, schema_val); |
| } |
| if (txn->commit() != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| |
| for (int i = 0; i < rowset.num_segments(); ++i) { |
| auto path = segment_path(rowset.tablet_id(), rowset.rowset_id_v2(), i); |
| accessor->put_file(path, path); |
| for (auto& index : rowset.tablet_schema().index()) { |
| std::string path; |
| if (!is_inverted_idx_v2) { |
| path = inverted_index_path_v1(rowset.tablet_id(), rowset.rowset_id_v2(), i, |
| index.index_id(), index.index_suffix_name()); |
| } else { |
| path = inverted_index_path_v2(rowset.tablet_id(), rowset.rowset_id_v2(), i); |
| } |
| accessor->put_file(path, path); |
| } |
| } |
| return 0; |
| } |
| |
| static int create_committed_rowset(TxnKv* txn_kv, StorageVaultAccessor* accessor, |
| const std::string& resource_id, int64_t tablet_id, |
| int64_t version, int num_segments = 1, |
| int num_inverted_indexes = 1) { |
| std::string key; |
| std::string val; |
| |
| auto rowset_id = next_rowset_id(); |
| MetaRowsetKeyInfo key_info {instance_id, tablet_id, version}; |
| meta_rowset_key(key_info, &key); |
| |
| doris::RowsetMetaCloudPB rowset_pb; |
| rowset_pb.set_rowset_id(0); // useless but required |
| rowset_pb.set_rowset_id_v2(rowset_id); |
| rowset_pb.set_num_segments(num_segments); |
| rowset_pb.set_tablet_id(tablet_id); |
| rowset_pb.set_resource_id(resource_id); |
| rowset_pb.set_creation_time(current_time); |
| if (num_inverted_indexes > 0) { |
| auto schema = rowset_pb.mutable_tablet_schema(); |
| for (int i = 0; i < num_inverted_indexes; ++i) { |
| schema->add_index()->set_index_id(i); |
| } |
| } |
| rowset_pb.SerializeToString(&val); |
| |
| std::unique_ptr<Transaction> txn; |
| if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| txn->put(key, val); |
| if (txn->commit() != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| |
| for (int i = 0; i < num_segments; ++i) { |
| auto path = segment_path(tablet_id, rowset_id, i); |
| accessor->put_file(path, ""); |
| for (int j = 0; j < num_inverted_indexes; ++j) { |
| auto path = inverted_index_path_v1(tablet_id, rowset_id, i, j, ""); |
| accessor->put_file(path, ""); |
| } |
| } |
| return 0; |
| } |
| |
| static int create_committed_rowset_with_rowset_id(TxnKv* txn_kv, StorageVaultAccessor* accessor, |
| const std::string& resource_id, int64_t tablet_id, |
| int64_t start_version, int64_t end_version, |
| std::string rowset_id, bool segments_overlap, |
| int num_segments) { |
| std::string key; |
| std::string val; |
| |
| MetaRowsetKeyInfo key_info {instance_id, tablet_id, end_version}; |
| meta_rowset_key(key_info, &key); |
| |
| doris::RowsetMetaCloudPB rowset_pb; |
| rowset_pb.set_rowset_id(0); // useless but required |
| rowset_pb.set_rowset_id_v2(rowset_id); |
| rowset_pb.set_num_segments(num_segments); |
| rowset_pb.set_tablet_id(tablet_id); |
| rowset_pb.set_resource_id(resource_id); |
| rowset_pb.set_creation_time(current_time); |
| rowset_pb.set_start_version(start_version); |
| rowset_pb.set_end_version(end_version); |
| rowset_pb.set_segments_overlap_pb(segments_overlap ? OVERLAPPING : NONOVERLAPPING); |
| rowset_pb.SerializeToString(&val); |
| |
| std::unique_ptr<Transaction> txn; |
| if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| txn->put(key, val); |
| if (txn->commit() != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| |
| for (int i = 0; i < num_segments; ++i) { |
| auto path = segment_path(tablet_id, rowset_id, i); |
| accessor->put_file(path, ""); |
| } |
| return 0; |
| } |
| |
| static void create_delete_bitmaps(Transaction* txn, int64_t tablet_id, std::string rowset_id, |
| int64_t start_version, int64_t end_version) { |
| for (int64_t ver {start_version}; ver <= end_version; ver++) { |
| auto key = meta_delete_bitmap_key({instance_id, tablet_id, rowset_id, ver, 0}); |
| std::string val {"test_data"}; |
| txn->put(key, val); |
| } |
| } |
| |
| static int create_tablet(TxnKv* txn_kv, int64_t table_id, int64_t index_id, int64_t partition_id, |
| int64_t tablet_id, bool is_mow = false) { |
| std::unique_ptr<Transaction> txn; |
| if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| doris::TabletMetaCloudPB tablet_meta; |
| tablet_meta.set_tablet_id(tablet_id); |
| tablet_meta.set_enable_unique_key_merge_on_write(is_mow); |
| auto val = tablet_meta.SerializeAsString(); |
| auto key = meta_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}); |
| txn->put(key, val); |
| key = stats_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}); |
| txn->put(key, val); // val is not necessary |
| key = job_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}); |
| txn->put(key, val); // val is not necessary |
| |
| // mock tablet index |
| TabletIndexPB tablet_idx_pb; |
| tablet_idx_pb.set_db_id(db_id); |
| tablet_idx_pb.set_table_id(table_id); |
| tablet_idx_pb.set_index_id(index_id); |
| tablet_idx_pb.set_partition_id(partition_id); |
| tablet_idx_pb.set_tablet_id(tablet_id); |
| auto idx_val = tablet_idx_pb.SerializeAsString(); |
| key = meta_tablet_idx_key({instance_id, tablet_id}); |
| txn->put(key, idx_val); |
| LOG(INFO) << "tablet_idx_pb:" << tablet_idx_pb.DebugString() << " key=" << hex(key); |
| |
| if (txn->commit() != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| return 0; |
| } |
| |
| static int create_recycle_partiton(TxnKv* txn_kv, int64_t table_id, int64_t partition_id, |
| const std::vector<int64_t>& index_ids) { |
| std::string key; |
| std::string val; |
| |
| RecyclePartKeyInfo key_info {instance_id, partition_id}; |
| recycle_partition_key(key_info, &key); |
| |
| RecyclePartitionPB partition_pb; |
| partition_pb.set_db_id(db_id); |
| partition_pb.set_table_id(table_id); |
| for (auto index_id : index_ids) { |
| partition_pb.add_index_id(index_id); |
| } |
| partition_pb.set_creation_time(current_time); |
| partition_pb.set_state(RecyclePartitionPB::DROPPED); |
| partition_pb.SerializeToString(&val); |
| |
| std::unique_ptr<Transaction> txn; |
| if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| txn->put(key, val); |
| if (txn->commit() != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| return 0; |
| } |
| |
| static int create_partition_version_kv(TxnKv* txn_kv, int64_t table_id, int64_t partition_id) { |
| auto key = partition_version_key({instance_id, db_id, table_id, partition_id}); |
| VersionPB version; |
| version.set_version(1); |
| auto val = version.SerializeAsString(); |
| std::unique_ptr<Transaction> txn; |
| if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| txn->put(key, val); |
| if (txn->commit() != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| return 0; |
| } |
| |
| static int create_delete_bitmap_update_lock_kv(TxnKv* txn_kv, int64_t table_id, int64_t lock_id, |
| int64_t initiator, int64_t expiration) { |
| auto key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); |
| DeleteBitmapUpdateLockPB lock_info; |
| lock_info.set_lock_id(lock_id); |
| auto val = lock_info.SerializeAsString(); |
| std::unique_ptr<Transaction> txn; |
| if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| txn->put(key, val); |
| std::string tablet_compaction_key = |
| mow_tablet_compaction_key({instance_id, table_id, initiator}); |
| std::string tablet_compaction_val; |
| MowTabletCompactionPB mow_tablet_compaction; |
| mow_tablet_compaction.set_expiration(expiration); |
| mow_tablet_compaction.SerializeToString(&tablet_compaction_val); |
| txn->put(tablet_compaction_key, tablet_compaction_val); |
| if (txn->commit() != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| return 0; |
| } |
| |
| static int create_table_version_kv(TxnKv* txn_kv, int64_t table_id) { |
| auto key = table_version_key({instance_id, db_id, table_id}); |
| std::string val(sizeof(int64_t), 0); |
| *reinterpret_cast<int64_t*>(val.data()) = (int64_t)1; |
| std::unique_ptr<Transaction> txn; |
| if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| txn->put(key, val); |
| if (txn->commit() != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| return 0; |
| } |
| |
| static int create_txn_label_kv(TxnKv* txn_kv, std::string label, int64_t db_id) { |
| std::string txn_label_key_; |
| std::string txn_label_val; |
| auto keyinfo = TxnLabelKeyInfo({instance_id, db_id, label}); |
| txn_label_key(keyinfo, &txn_label_key_); |
| std::unique_ptr<Transaction> txn; |
| if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| txn->put(txn_label_key_, label); |
| if (txn->commit() != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| return 0; |
| } |
| |
| static int create_recycle_index(TxnKv* txn_kv, int64_t table_id, int64_t index_id) { |
| std::string key; |
| std::string val; |
| |
| RecycleIndexKeyInfo key_info {instance_id, index_id}; |
| recycle_index_key(key_info, &key); |
| |
| RecycleIndexPB index_pb; |
| |
| index_pb.set_table_id(table_id); |
| index_pb.set_creation_time(current_time); |
| index_pb.set_state(RecycleIndexPB::DROPPED); |
| index_pb.SerializeToString(&val); |
| |
| std::unique_ptr<Transaction> txn; |
| if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| txn->put(key, val); |
| if (txn->commit() != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| return 0; |
| } |
| |
| static int get_txn_info(std::shared_ptr<TxnKv> txn_kv, std::string instance_id, int64_t db_id, |
| int64_t txn_id, TxnInfoPB& txn_info_pb) { |
| std::string txn_inf_key; |
| std::string txn_inf_val; |
| TxnInfoKeyInfo txn_inf_key_info {instance_id, db_id, txn_id}; |
| |
| LOG(INFO) << instance_id << "|" << db_id << "|" << txn_id; |
| |
| std::unique_ptr<Transaction> txn; |
| if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| txn_info_key(txn_inf_key_info, &txn_inf_key); |
| LOG(INFO) << "txn_inf_key:" << hex(txn_inf_key); |
| TxnErrorCode err = txn->get(txn_inf_key, &txn_inf_val); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG(WARNING) << "txn->get failed, err=" << err; |
| return -2; |
| } |
| |
| if (!txn_info_pb.ParseFromString(txn_inf_val)) { |
| LOG(WARNING) << "ParseFromString failed"; |
| return -3; |
| } |
| LOG(INFO) << "txn_info_pb" << txn_info_pb.DebugString(); |
| err = txn->commit(); |
| if (err != TxnErrorCode::TXN_OK) { |
| LOG(WARNING) << "txn->commit failed, err=" << err; |
| return -4; |
| } |
| return 0; |
| } |
| |
| static int check_recycle_txn_keys(std::shared_ptr<TxnKv> txn_kv, std::string instance_id, |
| int64_t db_id, int64_t txn_id, const std::string& label) { |
| std::string txn_inf_key; |
| std::string txn_inf_val; |
| TxnInfoKeyInfo txn_inf_key_info {instance_id, db_id, txn_id}; |
| |
| LOG(INFO) << instance_id << "|" << db_id << "|" << txn_id; |
| |
| std::unique_ptr<Transaction> txn; |
| if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| txn_info_key(txn_inf_key_info, &txn_inf_key); |
| TxnErrorCode err = txn->get(txn_inf_key, &txn_inf_val); |
| if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| return -2; |
| } |
| |
| std::string label_key, label_val; |
| txn_label_key({instance_id, db_id, label}, &label_key); |
| err = txn->get(label_key, &label_val); |
| if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| return -3; |
| } |
| |
| std::string index_key, index_val; |
| index_key = txn_index_key({instance_id, txn_id}); |
| err = txn->get(index_key, &index_val); |
| if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| return -4; |
| } |
| |
| std::string running_key; |
| std::string running_value; |
| TxnRunningKeyInfo running_key_info {instance_id, db_id, txn_id}; |
| txn_running_key(running_key_info, &running_key); |
| err = txn->get(running_key, &running_value); |
| if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| return -5; |
| } |
| |
| std::string rec_txn_key; |
| std::string rec_txn_val; |
| RecycleTxnKeyInfo recycle_txn_key_info {instance_id, db_id, txn_id}; |
| recycle_txn_key(recycle_txn_key_info, &rec_txn_key); |
| err = txn->get(rec_txn_key, &rec_txn_val); |
| if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| return -6; |
| } |
| |
| return 0; |
| } |
| |
| static int create_instance(const std::string& internal_stage_id, |
| const std::string& external_stage_id, InstanceInfoPB& instance_info) { |
| // create internal stage |
| { |
| std::string s3_prefix = "internal_prefix"; |
| std::string stage_prefix = fmt::format("{}/stage/root/{}/", s3_prefix, internal_stage_id); |
| ObjectStoreInfoPB object_info; |
| object_info.set_id(internal_stage_id); // test use accessor_map_ in recycle |
| |
| StagePB internal_stage; |
| internal_stage.set_type(StagePB::INTERNAL); |
| internal_stage.set_stage_id(internal_stage_id); |
| ObjectStoreInfoPB internal_object_info; |
| internal_object_info.set_prefix(stage_prefix); |
| internal_object_info.set_id("0"); |
| internal_stage.mutable_obj_info()->CopyFrom(internal_object_info); |
| |
| instance_info.add_obj_info()->CopyFrom(object_info); |
| instance_info.add_stages()->CopyFrom(internal_stage); |
| } |
| |
| // create external stage |
| { |
| ObjectStoreInfoPB object_info; |
| object_info.set_id(external_stage_id); |
| |
| StagePB external_stage; |
| external_stage.set_type(StagePB::EXTERNAL); |
| external_stage.set_stage_id(external_stage_id); |
| external_stage.mutable_obj_info()->CopyFrom(object_info); |
| |
| instance_info.add_obj_info()->CopyFrom(object_info); |
| instance_info.add_stages()->CopyFrom(external_stage); |
| } |
| |
| instance_info.set_instance_id(instance_id); |
| return 0; |
| } |
| |
| static int create_copy_job(TxnKv* txn_kv, const std::string& stage_id, int64_t table_id, |
| StagePB::StageType stage_type, CopyJobPB::JobStatus job_status, |
| std::vector<ObjectFilePB> object_files, int64_t timeout_time, |
| int64_t start_time = 0, int64_t finish_time = 0) { |
| std::string key; |
| std::string val; |
| CopyJobKeyInfo key_info {instance_id, stage_id, table_id, "copy_id", 0}; |
| copy_job_key(key_info, &key); |
| |
| CopyJobPB copy_job; |
| copy_job.set_stage_type(stage_type); |
| copy_job.set_job_status(job_status); |
| copy_job.set_timeout_time_ms(timeout_time); |
| if (start_time != 0) { |
| copy_job.set_start_time_ms(start_time); |
| } |
| if (finish_time != 0) { |
| copy_job.set_finish_time_ms(finish_time); |
| } |
| for (const auto& file : object_files) { |
| copy_job.add_object_files()->CopyFrom(file); |
| } |
| copy_job.SerializeToString(&val); |
| |
| std::vector<std::string> file_keys; |
| std::string file_val; |
| CopyFilePB copy_file; |
| copy_file.set_copy_id("copy_id"); |
| copy_file.set_group_id(0); |
| file_val = copy_file.SerializeAsString(); |
| |
| // create job files |
| for (const auto& file : object_files) { |
| CopyFileKeyInfo file_info {instance_id, stage_id, table_id, file.relative_path(), |
| file.etag()}; |
| std::string file_key; |
| copy_file_key(file_info, &file_key); |
| file_keys.push_back(file_key); |
| } |
| |
| std::unique_ptr<Transaction> txn; |
| if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| txn->put(key, val); |
| for (const auto& file_key : file_keys) { |
| txn->put(file_key, file_val); |
| } |
| if (txn->commit() != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| return 0; |
| } |
| |
| static int copy_job_exists(TxnKv* txn_kv, const std::string& stage_id, int64_t table_id, |
| bool* exist) { |
| std::string key; |
| std::string val; |
| CopyJobKeyInfo key_info {instance_id, stage_id, table_id, "copy_id", 0}; |
| copy_job_key(key_info, &key); |
| |
| std::unique_ptr<Transaction> txn; |
| if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| TxnErrorCode err = txn->get(key, &val); |
| if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| return -1; |
| } |
| *exist = err == TxnErrorCode::TXN_OK; |
| return 0; |
| } |
| |
| static int create_object_files(StorageVaultAccessor* accessor, |
| std::vector<ObjectFilePB>* object_files) { |
| for (auto& file : *object_files) { |
| auto key = file.relative_path(); |
| if (accessor->put_file(key, "") != 0) { |
| return -1; |
| } |
| file.set_etag(""); |
| } |
| return 0; |
| } |
| |
| static int get_copy_file_num(TxnKv* txn_kv, const std::string& stage_id, int64_t table_id, |
| int* file_num) { |
| *file_num = 0; |
| std::string key0; |
| std::string key1; |
| CopyFileKeyInfo key_info0 {instance_id, stage_id, table_id, "", ""}; |
| CopyFileKeyInfo key_info1 {instance_id, stage_id, table_id + 1, "", ""}; |
| copy_file_key(key_info0, &key0); |
| copy_file_key(key_info1, &key1); |
| |
| std::unique_ptr<Transaction> txn; |
| if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| std::unique_ptr<RangeGetIterator> it; |
| do { |
| if (txn->get(key0, key1, &it) != TxnErrorCode::TXN_OK) { |
| return -1; |
| } |
| while (it->has_next()) { |
| it->next(); |
| ++(*file_num); |
| } |
| key0.push_back('\x00'); |
| } while (it->more()); |
| return 0; |
| } |
| |
| TEST(RecyclerTest, recycle_empty) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("recycle_empty"); |
| obj_info->set_ak(config::test_s3_ak); |
| obj_info->set_sk(config::test_s3_sk); |
| obj_info->set_endpoint(config::test_s3_endpoint); |
| obj_info->set_region(config::test_s3_region); |
| obj_info->set_bucket(config::test_s3_bucket); |
| obj_info->set_prefix("recycle_empty"); |
| |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| |
| ASSERT_EQ(recycler.recycle_rowsets(), 0); |
| } |
| |
| TEST(RecyclerTest, recycle_rowsets) { |
| config::retention_seconds = 0; |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("recycle_rowsets"); |
| obj_info->set_ak(config::test_s3_ak); |
| obj_info->set_sk(config::test_s3_sk); |
| obj_info->set_endpoint(config::test_s3_endpoint); |
| obj_info->set_region(config::test_s3_region); |
| obj_info->set_bucket(config::test_s3_bucket); |
| obj_info->set_prefix("recycle_rowsets"); |
| |
| config::instance_recycler_worker_pool_size = 1; |
| |
| int insert_no_inverted_index = 0; |
| int insert_inverted_index = 0; |
| auto sp = SyncPoint::get_instance(); |
| std::unique_ptr<int, std::function<void(int*)>> defer( |
| (int*)0x01, [](int*) { SyncPoint::get_instance()->clear_all_call_backs(); }); |
| sp->set_call_back("InvertedIndexIdCache::insert1", [&](auto&&) { ++insert_no_inverted_index; }); |
| sp->set_call_back("InvertedIndexIdCache::insert2", [&](auto&&) { ++insert_inverted_index; }); |
| sp->enable_processing(); |
| |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| |
| std::vector<doris::TabletSchemaCloudPB> schemas; |
| for (int i = 0; i < 5; ++i) { |
| auto& schema = schemas.emplace_back(); |
| schema.set_schema_version(i); |
| schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1); |
| for (int j = 0; j < i; ++j) { |
| auto index = schema.add_index(); |
| index->set_index_id(j); |
| index->set_index_type(IndexType::INVERTED); |
| } |
| } |
| |
| auto accessor = recycler.accessor_map_.begin()->second; |
| constexpr int index_id = 10001, tablet_id = 10002; |
| for (int i = 0; i < 1000; ++i) { |
| auto rowset = create_rowset("recycle_rowsets", tablet_id, index_id, 5, schemas[i % 5]); |
| create_recycle_rowset( |
| txn_kv.get(), accessor.get(), rowset, |
| static_cast<RecycleRowsetPB::Type>(i % (RecycleRowsetPB::Type_MAX + 1)), i & 1); |
| } |
| for (int i = 0; i < 1000; ++i) { |
| auto rowset = create_rowset("recycle_rowsets", tablet_id, index_id, 5, schemas[i % 5]); |
| create_recycle_rowset(txn_kv.get(), accessor.get(), rowset, RecycleRowsetPB::COMPACT, true); |
| } |
| |
| ASSERT_EQ(recycler.recycle_rowsets(), 0); |
| |
| // check rowset does not exist on obj store |
| std::unique_ptr<ListIterator> list_iter; |
| ASSERT_EQ(0, accessor->list_directory(tablet_path_prefix(tablet_id), &list_iter)); |
| EXPECT_FALSE(list_iter->has_next()); |
| // check all recycle rowset kv have been deleted |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::unique_ptr<RangeGetIterator> it; |
| auto begin_key = recycle_key_prefix(instance_id); |
| auto end_key = recycle_key_prefix(instance_id + '\xff'); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| EXPECT_EQ(it->size(), 0); |
| // Check InvertedIndexIdCache |
| EXPECT_EQ(insert_inverted_index, 4); |
| EXPECT_EQ(insert_no_inverted_index, 1); |
| } |
| |
| TEST(RecyclerTest, bench_recycle_rowsets) { |
| config::retention_seconds = 0; |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("recycle_rowsets"); |
| obj_info->set_ak(config::test_s3_ak); |
| obj_info->set_sk(config::test_s3_sk); |
| obj_info->set_endpoint(config::test_s3_endpoint); |
| obj_info->set_region(config::test_s3_region); |
| obj_info->set_bucket(config::test_s3_bucket); |
| obj_info->set_prefix("recycle_rowsets"); |
| |
| config::instance_recycler_worker_pool_size = 10; |
| config::recycle_task_threshold_seconds = 0; |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| |
| auto sp = SyncPoint::get_instance(); |
| std::unique_ptr<int, std::function<void(int*)>> defer( |
| (int*)0x01, [](int*) { SyncPoint::get_instance()->clear_all_call_backs(); }); |
| sp->set_call_back("memkv::Transaction::get", [](auto&& args) { |
| auto* limit = try_any_cast<int*>(args[0]); |
| *limit = 100; |
| std::this_thread::sleep_for(std::chrono::milliseconds(5)); |
| }); |
| sp->set_call_back("MockAccessor::delete_files", [&](auto&& args) { |
| std::this_thread::sleep_for(std::chrono::seconds(1)); |
| bool found = recycler.check_recycle_tasks(); |
| ASSERT_EQ(found, true); |
| }); |
| sp->set_call_back("MockAccessor::delete_prefix", |
| [&](auto&&) { std::this_thread::sleep_for(std::chrono::milliseconds(20)); }); |
| sp->enable_processing(); |
| |
| std::vector<doris::TabletSchemaCloudPB> schemas; |
| for (int i = 0; i < 5; ++i) { |
| auto& schema = schemas.emplace_back(); |
| schema.set_schema_version(i); |
| schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1); |
| for (int j = 0; j < i; ++j) { |
| auto index = schema.add_index(); |
| index->set_index_id(j); |
| index->set_index_type(IndexType::INVERTED); |
| } |
| } |
| |
| auto accessor = recycler.accessor_map_.begin()->second; |
| constexpr int index_id = 10001, tablet_id = 10002; |
| for (int i = 0; i < 2000; ++i) { |
| auto rowset = create_rowset("recycle_rowsets", tablet_id, index_id, 5, schemas[i % 5]); |
| create_recycle_rowset(txn_kv.get(), accessor.get(), rowset, |
| i % 10 < 2 ? RecycleRowsetPB::PREPARE : RecycleRowsetPB::COMPACT, |
| i & 1); |
| } |
| |
| ASSERT_EQ(recycler.recycle_rowsets(), 0); |
| ASSERT_EQ(recycler.check_recycle_tasks(), false); |
| |
| // check rowset does not exist on obj store |
| std::unique_ptr<ListIterator> list_iter; |
| ASSERT_EQ(0, accessor->list_directory(tablet_path_prefix(tablet_id), &list_iter)); |
| ASSERT_FALSE(list_iter->has_next()); |
| // check all recycle rowset kv have been deleted |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::unique_ptr<RangeGetIterator> it; |
| auto begin_key = recycle_key_prefix(instance_id); |
| auto end_key = recycle_key_prefix(instance_id + '\xff'); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| } |
| |
| TEST(RecyclerTest, recycle_tmp_rowsets) { |
| config::retention_seconds = 0; |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("recycle_tmp_rowsets"); |
| obj_info->set_ak(config::test_s3_ak); |
| obj_info->set_sk(config::test_s3_sk); |
| obj_info->set_endpoint(config::test_s3_endpoint); |
| obj_info->set_region(config::test_s3_region); |
| obj_info->set_bucket(config::test_s3_bucket); |
| obj_info->set_prefix("recycle_tmp_rowsets"); |
| |
| int insert_no_inverted_index = 0; |
| int insert_inverted_index = 0; |
| auto sp = SyncPoint::get_instance(); |
| std::unique_ptr<int, std::function<void(int*)>> defer( |
| (int*)0x01, [](int*) { SyncPoint::get_instance()->clear_all_call_backs(); }); |
| sp->set_call_back("InvertedIndexIdCache::insert1", [&](auto&&) { ++insert_no_inverted_index; }); |
| sp->set_call_back("InvertedIndexIdCache::insert2", [&](auto&&) { ++insert_inverted_index; }); |
| sp->enable_processing(); |
| |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| |
| std::vector<doris::TabletSchemaCloudPB> schemas; |
| for (int i = 0; i < 5; ++i) { |
| auto& schema = schemas.emplace_back(); |
| schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1); |
| schema.set_schema_version(i); |
| for (int j = 0; j < i; ++j) { |
| auto index = schema.add_index(); |
| index->set_index_id(j); |
| index->set_index_type(IndexType::INVERTED); |
| } |
| } |
| |
| auto accessor = recycler.accessor_map_.begin()->second; |
| int64_t txn_id_base = 114115; |
| int64_t tablet_id_base = 10015; |
| int64_t index_id_base = 1000; |
| for (int i = 0; i < 100; ++i) { |
| int64_t txn_id = txn_id_base + i; |
| for (int j = 0; j < 20; ++j) { |
| auto rowset = create_rowset("recycle_tmp_rowsets", tablet_id_base + j, |
| index_id_base + j % 4, 5, schemas[i % 5], txn_id); |
| create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, i & 1); |
| } |
| } |
| |
| ASSERT_EQ(recycler.recycle_tmp_rowsets(), 0); |
| |
| // check rowset does not exist on obj store |
| std::unique_ptr<ListIterator> list_iter; |
| ASSERT_EQ(0, accessor->list_directory("data/", &list_iter)); |
| ASSERT_FALSE(list_iter->has_next()); |
| // check all tmp rowset kv have been deleted |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::unique_ptr<RangeGetIterator> it; |
| auto begin_key = meta_rowset_tmp_key({instance_id, 0, 0}); |
| auto end_key = meta_rowset_tmp_key({instance_id, INT64_MAX, 0}); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| // Check InvertedIndexIdCache |
| EXPECT_EQ(insert_inverted_index, 16); |
| EXPECT_EQ(insert_no_inverted_index, 4); |
| } |
| |
| TEST(RecyclerTest, recycle_tmp_rowsets_partial_update) { |
| config::retention_seconds = 0; |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("recycle_tmp_rowsets_partial_update"); |
| obj_info->set_ak(config::test_s3_ak); |
| obj_info->set_sk(config::test_s3_sk); |
| obj_info->set_endpoint(config::test_s3_endpoint); |
| obj_info->set_region(config::test_s3_region); |
| obj_info->set_bucket(config::test_s3_bucket); |
| obj_info->set_prefix("recycle_tmp_rowsets_partial_update"); |
| |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| |
| doris::TabletSchemaCloudPB schema; |
| |
| auto accessor = recycler.accessor_map_.begin()->second; |
| int64_t tablet_id = 10015; |
| int64_t index_id = 1000; |
| int64_t txn_id_base = 293039; |
| for (int j = 0; j < 20; ++j) { |
| int64_t txn_id = txn_id_base + j; |
| int segment_num = 5; |
| if (j < 15) { |
| auto rowset = create_rowset("recycle_tmp_rowsets_partial_update", tablet_id, index_id, |
| segment_num, schema, RowsetStatePB::VISIBLE, txn_id); |
| create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, false); |
| } else { |
| auto rowset = |
| create_rowset("recycle_tmp_rowsets_partial_update", tablet_id, tablet_id, |
| segment_num, schema, RowsetStatePB::BEGIN_PARTIAL_UPDATE, txn_id); |
| create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, false); |
| |
| // partial update may write new segment to an existing tmp rowsets |
| // we simulate that partial update load fails after it writes a segment |
| // and before it updates the segments num in tmp rowset meta |
| int extra_segment_id = segment_num; |
| auto path = segment_path(rowset.tablet_id(), rowset.rowset_id_v2(), extra_segment_id); |
| accessor->put_file(path, path); |
| } |
| } |
| |
| ASSERT_EQ(recycler.recycle_tmp_rowsets(), 0); |
| // check rowset does not exist on obj store |
| std::unique_ptr<ListIterator> list_iter; |
| ASSERT_EQ(0, accessor->list_directory("data/", &list_iter)); |
| ASSERT_FALSE(list_iter->has_next()); |
| // check all tmp rowset kv have been deleted |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::unique_ptr<RangeGetIterator> it; |
| auto begin_key = meta_rowset_tmp_key({instance_id, 0, 0}); |
| auto end_key = meta_rowset_tmp_key({instance_id, INT64_MAX, 0}); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| } |
| |
| TEST(RecyclerTest, recycle_tablet) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("recycle_tablet"); |
| obj_info->set_ak(config::test_s3_ak); |
| obj_info->set_sk(config::test_s3_sk); |
| obj_info->set_endpoint(config::test_s3_endpoint); |
| obj_info->set_region(config::test_s3_region); |
| obj_info->set_bucket(config::test_s3_bucket); |
| obj_info->set_prefix("recycle_tablet"); |
| |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| |
| std::vector<doris::TabletSchemaCloudPB> schemas; |
| for (int i = 0; i < 5; ++i) { |
| auto& schema = schemas.emplace_back(); |
| schema.set_schema_version(i); |
| for (int j = 0; j < i; ++j) { |
| auto index = schema.add_index(); |
| index->set_index_id(j); |
| index->set_index_type(IndexType::INVERTED); |
| } |
| } |
| |
| constexpr int table_id = 10000, index_id = 10001, partition_id = 10002, tablet_id = 10003; |
| auto accessor = recycler.accessor_map_.begin()->second; |
| create_tablet(txn_kv.get(), table_id, index_id, partition_id, tablet_id); |
| for (int i = 0; i < 500; ++i) { |
| auto rowset = create_rowset("recycle_tablet", tablet_id, index_id, 5, schemas[i % 5]); |
| create_recycle_rowset(txn_kv.get(), accessor.get(), rowset, |
| i % 10 < 2 ? RecycleRowsetPB::PREPARE : RecycleRowsetPB::COMPACT, |
| i & 1); |
| } |
| for (int i = 0; i < 500; ++i) { |
| create_committed_rowset(txn_kv.get(), accessor.get(), "recycle_tablet", tablet_id, i); |
| } |
| |
| ASSERT_EQ(create_partition_version_kv(txn_kv.get(), table_id, partition_id), 0); |
| |
| ASSERT_EQ(0, recycler.recycle_tablets(table_id, index_id)); |
| |
| // check rowset does not exist on s3 |
| std::unique_ptr<ListIterator> list_iter; |
| ASSERT_EQ(0, accessor->list_directory(tablet_path_prefix(tablet_id), &list_iter)); |
| ASSERT_FALSE(list_iter->has_next()); |
| // check all related kv have been deleted |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::unique_ptr<RangeGetIterator> it; |
| // meta_tablet_key, meta_tablet_idx_key, meta_rowset_key |
| auto begin_key = meta_key_prefix(instance_id); |
| auto end_key = meta_key_prefix(instance_id + '\xff'); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| // job_tablet_key |
| begin_key = job_tablet_key({instance_id, table_id, 0, 0, 0}); |
| end_key = job_tablet_key({instance_id, table_id + 1, 0, 0, 0}); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| // stats_tablet_key |
| begin_key = stats_tablet_key({instance_id, table_id, 0, 0, 0}); |
| end_key = stats_tablet_key({instance_id, table_id + 1, 0, 0, 0}); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| // recycle_rowset_key |
| begin_key = recycle_key_prefix(instance_id); |
| end_key = recycle_key_prefix(instance_id + '\xff'); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| } |
| |
| TEST(RecyclerTest, recycle_indexes) { |
| config::retention_seconds = 0; |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("recycle_indexes"); |
| obj_info->set_ak(config::test_s3_ak); |
| obj_info->set_sk(config::test_s3_sk); |
| obj_info->set_endpoint(config::test_s3_endpoint); |
| obj_info->set_region(config::test_s3_region); |
| obj_info->set_bucket(config::test_s3_bucket); |
| obj_info->set_prefix("recycle_indexes"); |
| |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| |
| std::vector<doris::TabletSchemaCloudPB> schemas; |
| for (int i = 0; i < 5; ++i) { |
| auto& schema = schemas.emplace_back(); |
| schema.set_schema_version(i); |
| schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1); |
| for (int j = 0; j < i; ++j) { |
| auto index = schema.add_index(); |
| index->set_index_id(j); |
| index->set_index_type(IndexType::INVERTED); |
| } |
| } |
| |
| constexpr int table_id = 10000, index_id = 10001, partition_id = 10002; |
| auto accessor = recycler.accessor_map_.begin()->second; |
| int64_t tablet_id_base = 10100; |
| int64_t txn_id_base = 114115; |
| for (int i = 0; i < 100; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| create_tablet(txn_kv.get(), table_id, index_id, partition_id, tablet_id); |
| for (int j = 0; j < 10; ++j) { |
| auto rowset = create_rowset("recycle_tablet", tablet_id, index_id, 5, schemas[j % 5]); |
| create_recycle_rowset(txn_kv.get(), accessor.get(), rowset, |
| j % 10 < 2 ? RecycleRowsetPB::PREPARE : RecycleRowsetPB::COMPACT, |
| j & 1); |
| auto tmp_rowset = create_rowset("recycle_tmp_rowsets", tablet_id, index_id, 5, |
| schemas[j % 5], txn_id_base + j); |
| tmp_rowset.set_resource_id("recycle_indexes"); |
| create_tmp_rowset(txn_kv.get(), accessor.get(), tmp_rowset, j & 1); |
| } |
| for (int j = 0; j < 10; ++j) { |
| create_committed_rowset(txn_kv.get(), accessor.get(), "recycle_indexes", tablet_id, j); |
| } |
| } |
| |
| ASSERT_EQ(create_partition_version_kv(txn_kv.get(), table_id, partition_id), 0); |
| create_recycle_index(txn_kv.get(), table_id, index_id); |
| ASSERT_EQ(recycler.recycle_indexes(), 0); |
| |
| // check rowset does not exist on s3 |
| std::unique_ptr<ListIterator> list_iter; |
| ASSERT_EQ(0, accessor->list_directory("data/", &list_iter)); |
| ASSERT_FALSE(list_iter->has_next()); |
| // check all related kv have been deleted |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::unique_ptr<RangeGetIterator> it; |
| // meta_rowset_key |
| auto begin_key = meta_rowset_key({instance_id, 0, 0}); |
| auto end_key = meta_rowset_key({instance_id, INT64_MAX, 0}); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| // meta_rowset_tmp_key |
| begin_key = meta_rowset_tmp_key({instance_id, 0, 0}); |
| end_key = meta_rowset_tmp_key({instance_id, INT64_MAX, 0}); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 1000); |
| // meta_tablet_idx_key |
| begin_key = meta_tablet_idx_key({instance_id, 0}); |
| end_key = meta_tablet_idx_key({instance_id, INT64_MAX}); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| // meta_tablet_key |
| begin_key = meta_tablet_key({instance_id, 0, 0, 0, 0}); |
| end_key = meta_tablet_key({instance_id, INT64_MAX, 0, 0, 0}); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| // meta_schema_key |
| begin_key = meta_schema_key({instance_id, 0, 0}); |
| end_key = meta_schema_key({instance_id, INT64_MAX, 0}); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| // job_tablet_key |
| begin_key = job_tablet_key({instance_id, table_id, 0, 0, 0}); |
| end_key = job_tablet_key({instance_id, table_id + 1, 0, 0, 0}); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| // stats_tablet_key |
| begin_key = stats_tablet_key({instance_id, table_id, 0, 0, 0}); |
| end_key = stats_tablet_key({instance_id, table_id + 1, 0, 0, 0}); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| // recycle_rowset_key |
| begin_key = recycle_key_prefix(instance_id); |
| end_key = recycle_key_prefix(instance_id + '\xff'); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| |
| // Test recycle tmp rowsets after recycle indexes |
| ASSERT_EQ(recycler.recycle_tmp_rowsets(), 0); |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| begin_key = meta_rowset_tmp_key({instance_id, 0, 0}); |
| end_key = meta_rowset_tmp_key({instance_id, INT64_MAX, 0}); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| } |
| |
| TEST(RecyclerTest, recycle_partitions) { |
| config::retention_seconds = 0; |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("recycle_partitions"); |
| obj_info->set_ak(config::test_s3_ak); |
| obj_info->set_sk(config::test_s3_sk); |
| obj_info->set_endpoint(config::test_s3_endpoint); |
| obj_info->set_region(config::test_s3_region); |
| obj_info->set_bucket(config::test_s3_bucket); |
| obj_info->set_prefix("recycle_partitions"); |
| |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| |
| std::vector<doris::TabletSchemaCloudPB> schemas; |
| for (int i = 0; i < 5; ++i) { |
| auto& schema = schemas.emplace_back(); |
| schema.set_schema_version(i); |
| schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1); |
| for (int j = 0; j < i; ++j) { |
| auto index = schema.add_index(); |
| index->set_index_id(j); |
| index->set_index_type(IndexType::INVERTED); |
| } |
| } |
| |
| constexpr int table_id = 10000, partition_id = 30020; |
| auto accessor = recycler.accessor_map_.begin()->second; |
| std::vector<int64_t> index_ids {20200, 20201, 20202, 20203, 20204}; |
| |
| int64_t tablet_id_base = 10100; |
| for (auto index_id : index_ids) { |
| for (int i = 0; i < 20; ++i) { |
| int64_t tablet_id = tablet_id_base++; |
| ASSERT_EQ(create_tablet(txn_kv.get(), table_id, index_id, partition_id, tablet_id), 0); |
| for (int j = 0; j < 10; ++j) { |
| auto rowset = |
| create_rowset("recycle_tablet", tablet_id, index_id, 5, schemas[j % 5]); |
| create_recycle_rowset( |
| txn_kv.get(), accessor.get(), rowset, |
| j % 10 < 2 ? RecycleRowsetPB::PREPARE : RecycleRowsetPB::COMPACT, j & 1); |
| } |
| for (int j = 0; j < 10; ++j) { |
| create_committed_rowset(txn_kv.get(), accessor.get(), "recycle_partitions", |
| tablet_id, j); |
| } |
| } |
| } |
| |
| ASSERT_EQ(create_partition_version_kv(txn_kv.get(), table_id, partition_id), 0); |
| |
| create_recycle_partiton(txn_kv.get(), table_id, partition_id, index_ids); |
| ASSERT_EQ(recycler.recycle_partitions(), 0); |
| |
| // check rowset does not exist on s3 |
| std::unique_ptr<ListIterator> list_iter; |
| ASSERT_EQ(0, accessor->list_directory("data/", &list_iter)); |
| ASSERT_FALSE(list_iter->has_next()); |
| // check all related kv have been deleted |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::unique_ptr<RangeGetIterator> it; |
| // meta_rowset_key |
| auto begin_key = meta_rowset_key({instance_id, 0, 0}); |
| auto end_key = meta_rowset_key({instance_id, INT64_MAX, 0}); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| // meta_rowset_tmp_key |
| begin_key = meta_rowset_tmp_key({instance_id, 0, 0}); |
| end_key = meta_rowset_tmp_key({instance_id, INT64_MAX, 0}); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| // meta_tablet_idx_key |
| begin_key = meta_tablet_idx_key({instance_id, 0}); |
| end_key = meta_tablet_idx_key({instance_id, INT64_MAX}); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| // meta_tablet_key |
| begin_key = meta_tablet_key({instance_id, 0, 0, 0, 0}); |
| end_key = meta_tablet_key({instance_id, INT64_MAX, 0, 0, 0}); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| // job_tablet_key |
| begin_key = job_tablet_key({instance_id, table_id, 0, 0, 0}); |
| end_key = job_tablet_key({instance_id, table_id + 1, 0, 0, 0}); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| // stats_tablet_key |
| begin_key = stats_tablet_key({instance_id, table_id, 0, 0, 0}); |
| end_key = stats_tablet_key({instance_id, table_id + 1, 0, 0, 0}); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| // recycle_rowset_key |
| begin_key = recycle_key_prefix(instance_id); |
| end_key = recycle_key_prefix(instance_id + '\xff'); |
| ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| } |
| |
| TEST(RecyclerTest, recycle_versions) { |
| config::retention_seconds = 0; |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| std::vector<int64_t> index_ids {20001, 20002, 20003, 20004, 20005}; |
| std::vector<int64_t> partition_ids {30001, 30002, 30003, 30004, 30005, 30006}; |
| constexpr int table_id = 10000; |
| |
| int64_t tablet_id = 40000; |
| for (auto index_id : index_ids) { |
| for (auto partition_id : partition_ids) { |
| create_tablet(txn_kv.get(), table_id, index_id, partition_id, ++tablet_id); |
| } |
| } |
| for (auto partition_id : partition_ids) { |
| create_partition_version_kv(txn_kv.get(), table_id, partition_id); |
| } |
| create_table_version_kv(txn_kv.get(), table_id); |
| // Drop partitions |
| for (int i = 0; i < 5; ++i) { |
| create_recycle_partiton(txn_kv.get(), table_id, partition_ids[i], index_ids); |
| } |
| // create delete bitmap update lock kv |
| create_delete_bitmap_update_lock_kv(txn_kv.get(), table_id, -1, 100, 60); |
| create_delete_bitmap_update_lock_kv(txn_kv.get(), table_id, -1, 110, 60); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| // Recycle all partitions in table except 30006 |
| ASSERT_EQ(recycler.recycle_partitions(), 0); |
| ASSERT_EQ(recycler.recycle_versions(), 0); // `recycle_versions` should do nothing |
| // All partition version kvs except version of partition 30006 must have been deleted |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| auto partition_key_begin = partition_version_key({instance_id, db_id, table_id, 0}); |
| auto partition_key_end = partition_version_key({instance_id, db_id, table_id, INT64_MAX}); |
| std::unique_ptr<RangeGetIterator> iter; |
| ASSERT_EQ(txn->get(partition_key_begin, partition_key_end, &iter), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(iter->size(), 1); |
| auto [pk, pv] = iter->next(); |
| EXPECT_EQ(pk, partition_version_key({instance_id, db_id, table_id, 30006})); |
| // Table 10000's table version must not be deleted |
| auto table_key_begin = table_version_key({instance_id, db_id, 0}); |
| auto table_key_end = table_version_key({instance_id, db_id, INT64_MAX}); |
| ASSERT_EQ(txn->get(table_key_begin, table_key_end, &iter), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(iter->size(), 1); |
| auto [tk, tv] = iter->next(); |
| EXPECT_EQ(tk, table_version_key({instance_id, db_id, 10000})); |
| // delete bitmap update lock must not be deleted |
| auto delete_bitmap_update_lock_key = |
| meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); |
| std::string delete_bitmap_update_lock_val; |
| ASSERT_EQ(txn->get(delete_bitmap_update_lock_key, &delete_bitmap_update_lock_val), |
| TxnErrorCode::TXN_OK); |
| auto tablet_compaction_key0 = mow_tablet_compaction_key({instance_id, table_id, 0}); |
| auto tablet_compaction_key1 = mow_tablet_compaction_key({instance_id, table_id + 1, 0}); |
| ASSERT_EQ(txn->get(tablet_compaction_key0, tablet_compaction_key1, &iter), |
| TxnErrorCode::TXN_OK); |
| ASSERT_EQ(iter->size(), 2); |
| |
| // Drop indexes |
| for (auto index_id : index_ids) { |
| create_recycle_index(txn_kv.get(), table_id, index_id); |
| } |
| // Recycle all indexes of the table, that is, the table has been dropped |
| ASSERT_EQ(recycler.recycle_indexes(), 0); |
| // `recycle_versions` should delete all version kvs of the dropped table |
| ASSERT_EQ(recycler.recycle_versions(), 0); |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn->get(partition_key_begin, partition_key_end, &iter), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(iter->size(), 0); |
| ASSERT_EQ(txn->get(table_key_begin, table_key_end, &iter), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(iter->size(), 0); |
| // delete bitmap update lock must be deleted |
| ASSERT_EQ(txn->get(delete_bitmap_update_lock_key, &delete_bitmap_update_lock_val), |
| TxnErrorCode::TXN_KEY_NOT_FOUND); |
| ASSERT_EQ(txn->get(tablet_compaction_key0, tablet_compaction_key1, &iter), |
| TxnErrorCode::TXN_OK); |
| ASSERT_EQ(iter->size(), 0); |
| } |
| |
| TEST(RecyclerTest, advance_pending_txn) { |
| auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| ASSERT_NE(txn_kv.get(), nullptr); |
| auto rs = std::make_shared<MockResourceManager>(txn_kv); |
| auto rl = std::make_shared<RateLimiter>(); |
| auto meta_service = std::make_unique<MetaServiceImpl>(txn_kv, rs, rl); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| int64_t db_id = 666; |
| int64_t table_id = 1234; |
| int64_t txn_id = -1; |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("advance_pending_txn"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(1); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| txn_id = res.txn_id(); |
| ASSERT_GT(txn_id, -1); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| InstanceInfoPB instance; |
| instance.set_instance_id(mock_instance); |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| sleep(1); |
| ASSERT_EQ(recycler.abort_timeout_txn(), 0); |
| TxnInfoPB txn_info_pb; |
| get_txn_info(txn_kv, mock_instance, db_id, txn_id, txn_info_pb); |
| ASSERT_EQ(txn_info_pb.status(), TxnStatusPB::TXN_STATUS_ABORTED); |
| } |
| |
| TEST(RecyclerTest, advance_pending_txn_and_rebegin) { |
| config::label_keep_max_second = 0; |
| auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| ASSERT_NE(txn_kv.get(), nullptr); |
| auto rs = std::make_shared<MockResourceManager>(txn_kv); |
| auto rl = std::make_shared<RateLimiter>(); |
| auto meta_service = std::make_unique<MetaServiceImpl>(txn_kv, rs, rl); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| int64_t db_id = 888; |
| int64_t table_id = 1234; |
| int64_t txn_id = -1; |
| std::string cloud_unique_id = "test_cloud_unique_id22131"; |
| std::string label = "advance_pending_txn_and_rebegin"; |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| |
| req.set_cloud_unique_id(cloud_unique_id); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label(label); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(1); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| txn_id = res.txn_id(); |
| ASSERT_GT(txn_id, -1); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| InstanceInfoPB instance; |
| instance.set_instance_id(mock_instance); |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| sleep(1); |
| ASSERT_EQ(recycler.abort_timeout_txn(), 0); |
| TxnInfoPB txn_info_pb; |
| get_txn_info(txn_kv, mock_instance, db_id, txn_id, txn_info_pb); |
| ASSERT_EQ(txn_info_pb.status(), TxnStatusPB::TXN_STATUS_ABORTED); |
| |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| |
| req.set_cloud_unique_id(cloud_unique_id); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label(label); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(1); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| txn_id = res.txn_id(); |
| ASSERT_GT(txn_id, -1); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| TEST(RecyclerTest, recycle_expired_txn_label) { |
| config::label_keep_max_second = 0; |
| auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| ASSERT_NE(txn_kv.get(), nullptr); |
| auto rs = std::make_shared<MockResourceManager>(txn_kv); |
| auto rl = std::make_shared<RateLimiter>(); |
| auto meta_service = std::make_unique<MetaServiceImpl>(txn_kv, rs, rl); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| int64_t db_id = 88812123; |
| int64_t table_id = 12131234; |
| int64_t txn_id = -1; |
| std::string cloud_unique_id = "test_cloud_unique_id2"; |
| std::string label = "recycle_expired_txn_label"; |
| { |
| // 1. begin_txn |
| // 2. abort_txn by db_id and label |
| // 3. recycle_expired_txn_label |
| // 4. check |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| |
| req.set_cloud_unique_id(cloud_unique_id); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label(label); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(100000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| txn_id = res.txn_id(); |
| ASSERT_GT(txn_id, -1); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| InstanceInfoPB instance; |
| instance.set_instance_id(mock_instance); |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| recycler.abort_timeout_txn(); |
| TxnInfoPB txn_info_pb; |
| ASSERT_EQ(get_txn_info(txn_kv, mock_instance, db_id, txn_id, txn_info_pb), 0); |
| ASSERT_EQ(txn_info_pb.status(), TxnStatusPB::TXN_STATUS_PREPARED); |
| |
| // abort txn by db_id and label |
| { |
| brpc::Controller cntl; |
| AbortTxnRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_db_id(db_id); |
| req.set_label(label); |
| req.set_reason("test"); |
| AbortTxnResponse res; |
| meta_service->abort_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_EQ(res.txn_info().status(), TxnStatusPB::TXN_STATUS_ABORTED); |
| } |
| recycler.recycle_expired_txn_label(); |
| ASSERT_EQ(get_txn_info(txn_kv, mock_instance, db_id, txn_id, txn_info_pb), -2); |
| ASSERT_EQ(check_recycle_txn_keys(txn_kv, mock_instance, db_id, txn_id, label), 0); |
| } |
| |
| { |
| // 1. begin_txn |
| // 2. abort_txn by db_id and txn_id |
| // 3. recycle_expired_txn_label |
| // 4. check |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| |
| req.set_cloud_unique_id(cloud_unique_id); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label(label); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(10000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| txn_id = res.txn_id(); |
| ASSERT_GT(txn_id, -1); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| InstanceInfoPB instance; |
| instance.set_instance_id(mock_instance); |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| sleep(1); |
| recycler.abort_timeout_txn(); |
| TxnInfoPB txn_info_pb; |
| get_txn_info(txn_kv, mock_instance, db_id, txn_id, txn_info_pb); |
| ASSERT_EQ(txn_info_pb.status(), TxnStatusPB::TXN_STATUS_PREPARED); |
| |
| // abort txn by db_id and label |
| { |
| brpc::Controller cntl; |
| AbortTxnRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_reason("test"); |
| AbortTxnResponse res; |
| meta_service->abort_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_EQ(res.txn_info().status(), TxnStatusPB::TXN_STATUS_ABORTED); |
| } |
| recycler.recycle_expired_txn_label(); |
| ASSERT_EQ(get_txn_info(txn_kv, mock_instance, db_id, txn_id, txn_info_pb), -2); |
| ASSERT_EQ(check_recycle_txn_keys(txn_kv, mock_instance, db_id, txn_id, label), 0); |
| } |
| |
| { |
| // 1. begin_txn |
| // 2. commit_txn |
| // 3. recycle_expired_txn_label |
| // 4. check |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| |
| req.set_cloud_unique_id(cloud_unique_id); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label(label); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(10000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| txn_id = res.txn_id(); |
| ASSERT_GT(txn_id, -1); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| InstanceInfoPB instance; |
| instance.set_instance_id(mock_instance); |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| sleep(1); |
| recycler.abort_timeout_txn(); |
| TxnInfoPB txn_info_pb; |
| get_txn_info(txn_kv, mock_instance, db_id, txn_id, txn_info_pb); |
| ASSERT_EQ(txn_info_pb.status(), TxnStatusPB::TXN_STATUS_PREPARED); |
| |
| // commit_txn |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| recycler.recycle_expired_txn_label(); |
| ASSERT_EQ(get_txn_info(txn_kv, mock_instance, db_id, txn_id, txn_info_pb), -2); |
| ASSERT_EQ(check_recycle_txn_keys(txn_kv, mock_instance, db_id, txn_id, label), 0); |
| } |
| |
| label = "recycle_expired_txn_label_with_sub_txn"; |
| int64_t table2_id = 12131278; |
| { |
| // 1. begin_txn |
| // 2. begin_sub_txn2 |
| // 3. begin_sub_txn3 |
| // 4. abort_sub_txn3 |
| // 5. commit_txn |
| // 6. recycle_expired_txn_label |
| // 7. check |
| [[maybe_unused]] int64_t sub_txn_id1 = -1; |
| int64_t sub_txn_id2 = -1; |
| int64_t sub_txn_id3 = -1; |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| |
| req.set_cloud_unique_id(cloud_unique_id); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label(label); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(10000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| txn_id = res.txn_id(); |
| sub_txn_id1 = txn_id; |
| ASSERT_GT(txn_id, -1); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| InstanceInfoPB instance; |
| instance.set_instance_id(mock_instance); |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| sleep(1); |
| recycler.abort_timeout_txn(); |
| TxnInfoPB txn_info_pb; |
| get_txn_info(txn_kv, mock_instance, db_id, txn_id, txn_info_pb); |
| ASSERT_EQ(txn_info_pb.status(), TxnStatusPB::TXN_STATUS_PREPARED); |
| |
| // 2. begin sub_txn2 |
| { |
| brpc::Controller cntl; |
| BeginSubTxnRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_txn_id(txn_id); |
| req.set_sub_txn_num(0); |
| req.set_db_id(db_id); |
| req.set_label("test_sub_label1"); |
| req.mutable_table_ids()->Add(table_id); |
| req.mutable_table_ids()->Add(table2_id); |
| BeginSubTxnResponse res; |
| meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_EQ(res.txn_info().table_ids().size(), 2); |
| ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 1); |
| ASSERT_TRUE(res.has_sub_txn_id()); |
| sub_txn_id2 = res.sub_txn_id(); |
| ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]); |
| } |
| |
| // 3. begin sub_txn3 |
| { |
| brpc::Controller cntl; |
| BeginSubTxnRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_txn_id(txn_id); |
| req.set_sub_txn_num(1); |
| req.set_db_id(db_id); |
| req.set_label("test_sub_label2"); |
| req.mutable_table_ids()->Add(table_id); |
| req.mutable_table_ids()->Add(table2_id); |
| req.mutable_table_ids()->Add(table_id); |
| BeginSubTxnResponse res; |
| meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_EQ(res.txn_info().table_ids().size(), 3); |
| ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 2); |
| ASSERT_TRUE(res.has_sub_txn_id()); |
| sub_txn_id3 = res.sub_txn_id(); |
| ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]); |
| ASSERT_EQ(sub_txn_id3, res.txn_info().sub_txn_ids()[1]); |
| } |
| |
| // 4. abort sub_txn3 |
| { |
| brpc::Controller cntl; |
| AbortSubTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_txn_id(txn_id); |
| req.set_sub_txn_num(2); |
| req.set_sub_txn_id(sub_txn_id3); |
| req.set_db_id(db_id); |
| req.mutable_table_ids()->Add(table_id); |
| req.mutable_table_ids()->Add(table2_id); |
| AbortSubTxnResponse res; |
| meta_service->abort_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| // check txn state |
| ASSERT_EQ(res.txn_info().table_ids().size(), 2); |
| ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 2); |
| ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]); |
| ASSERT_EQ(sub_txn_id3, res.txn_info().sub_txn_ids()[1]); |
| } |
| |
| // 4. commit_txn |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_txn_load(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| // check txn_index_key for sub_txn_id exist |
| for (auto i : {sub_txn_id2, sub_txn_id3}) { |
| std::string key = txn_index_key({mock_instance, i}); |
| std::string val; |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK); |
| } |
| // 5. recycle |
| recycler.recycle_expired_txn_label(); |
| ASSERT_EQ(get_txn_info(txn_kv, mock_instance, db_id, txn_id, txn_info_pb), -2); |
| ASSERT_EQ(check_recycle_txn_keys(txn_kv, mock_instance, db_id, txn_id, label), 0); |
| // check txn_index_key for sub_txn_id are deleted |
| for (auto i : {sub_txn_id2, sub_txn_id3}) { |
| std::string key = txn_index_key({mock_instance, i}); |
| std::string val; |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); |
| } |
| } |
| } |
| |
| void create_object_file_pb(std::string prefix, std::vector<ObjectFilePB>* object_files, |
| int file_num = 10) { |
| for (int i = 0; i < file_num; ++i) { |
| ObjectFilePB object_file; |
| // create object in S3, pay attention to the relative path |
| object_file.set_relative_path(prefix + "/obj_" + std::to_string(i)); |
| object_file.set_etag(""); |
| object_files->push_back(object_file); |
| } |
| } |
| |
| TEST(RecyclerTest, recycle_copy_jobs) { |
| using namespace std::chrono; |
| auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| ASSERT_NE(txn_kv.get(), nullptr); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| // create internal/external stage |
| std::string internal_stage_id = "internal"; |
| std::string external_stage_id = "external"; |
| std::string nonexist_internal_stage_id = "non_exist_internal"; |
| std::string nonexist_external_stage_id = "non_exist_external"; |
| |
| InstanceInfoPB instance_info; |
| create_instance(internal_stage_id, external_stage_id, instance_info); |
| InstanceRecycler recycler(txn_kv, instance_info, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| auto internal_accessor = recycler.accessor_map_.find(internal_stage_id)->second; |
| |
| // create internal stage copy job with finish status |
| { |
| std::vector<ObjectFilePB> object_files; |
| create_object_file_pb("0", &object_files); |
| ASSERT_EQ(create_object_files(internal_accessor.get(), &object_files), 0); |
| create_copy_job(txn_kv.get(), internal_stage_id, 0, StagePB::INTERNAL, CopyJobPB::FINISH, |
| object_files, 0); |
| } |
| // create internal stage copy job and files with loading status which is timeout |
| { |
| std::vector<ObjectFilePB> object_files; |
| create_object_file_pb("5", &object_files); |
| ASSERT_EQ(create_object_files(internal_accessor.get(), &object_files), 0); |
| create_copy_job(txn_kv.get(), internal_stage_id, 5, StagePB::INTERNAL, CopyJobPB::LOADING, |
| object_files, 0); |
| } |
| // create internal stage copy job and files with loading status which is not timeout |
| { |
| std::vector<ObjectFilePB> object_files; |
| create_object_file_pb("6", &object_files); |
| ASSERT_EQ(create_object_files(internal_accessor.get(), &object_files), 0); |
| create_copy_job(txn_kv.get(), internal_stage_id, 6, StagePB::INTERNAL, CopyJobPB::LOADING, |
| object_files, 9963904963479L); |
| } |
| // create internal stage copy job with deleted stage id |
| { |
| std::vector<ObjectFilePB> object_files; |
| create_object_file_pb("8", &object_files); |
| ASSERT_EQ(create_object_files(internal_accessor.get(), &object_files), 0); |
| ASSERT_EQ(0, create_copy_job(txn_kv.get(), nonexist_internal_stage_id, 8, StagePB::INTERNAL, |
| CopyJobPB::FINISH, object_files, 0)); |
| } |
| // ----- external stage ---- |
| // <table_id, timeout_time, start_time, finish_time, job_status> |
| std::vector<std::tuple<int, int64_t, int64_t, int64_t, CopyJobPB::JobStatus>> |
| external_copy_jobs; |
| uint64_t current_time = |
| duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
| int64_t expire_time = current_time - config::copy_job_max_retention_second * 1000 - 1000; |
| int64_t not_expire_time = current_time - config::copy_job_max_retention_second * 1000 / 2; |
| // create external stage copy job with start time not expired and no finish time |
| external_copy_jobs.emplace_back(1, 0, 9963904963479L, 0, CopyJobPB::FINISH); |
| // create external stage copy job with start time expired and no finish time |
| external_copy_jobs.emplace_back(2, 0, expire_time, 0, CopyJobPB::FINISH); |
| // create external stage copy job with start time not expired and finish time not expired |
| external_copy_jobs.emplace_back(9, 0, expire_time, not_expire_time, CopyJobPB::FINISH); |
| // create external stage copy job with start time expired and finish time expired |
| external_copy_jobs.emplace_back(10, 0, expire_time, expire_time + 1, CopyJobPB::FINISH); |
| // create external stage copy job and files with loading status which is timeout |
| external_copy_jobs.emplace_back(3, 0, 0, 0, CopyJobPB::LOADING); |
| // create external stage copy job and files with loading status which is not timeout |
| external_copy_jobs.emplace_back(4, 9963904963479L, 0, 0, CopyJobPB::LOADING); |
| for (const auto& [table_id, timeout_time, start_time, finish_time, job_status] : |
| external_copy_jobs) { |
| std::vector<ObjectFilePB> object_files; |
| create_object_file_pb(external_stage_id + "_" + std::to_string(table_id), &object_files); |
| create_copy_job(txn_kv.get(), external_stage_id, table_id, StagePB::EXTERNAL, job_status, |
| object_files, timeout_time, start_time, finish_time); |
| } |
| // create external stage copy job with deleted stage id |
| { |
| std::vector<ObjectFilePB> object_files; |
| create_object_file_pb(nonexist_external_stage_id + "_7", &object_files); |
| ASSERT_EQ(0, create_copy_job(txn_kv.get(), nonexist_external_stage_id, 7, StagePB::EXTERNAL, |
| CopyJobPB::FINISH, object_files, 0)); |
| } |
| { |
| // <stage_id, table_id> |
| std::vector<std::tuple<std::string, int>> stage_table_files; |
| stage_table_files.emplace_back(internal_stage_id, 0); |
| stage_table_files.emplace_back(nonexist_internal_stage_id, 8); |
| stage_table_files.emplace_back(external_stage_id, 1); |
| stage_table_files.emplace_back(external_stage_id, 2); |
| stage_table_files.emplace_back(external_stage_id, 9); |
| stage_table_files.emplace_back(external_stage_id, 10); |
| stage_table_files.emplace_back(external_stage_id, 3); |
| stage_table_files.emplace_back(external_stage_id, 4); |
| stage_table_files.emplace_back(external_stage_id, 9); |
| stage_table_files.emplace_back(nonexist_external_stage_id, 7); |
| // check copy files |
| for (const auto& [stage_id, table_id] : stage_table_files) { |
| int file_num = 0; |
| ASSERT_EQ(0, get_copy_file_num(txn_kv.get(), stage_id, table_id, &file_num)); |
| ASSERT_EQ(10, file_num); |
| } |
| } |
| |
| ASSERT_EQ(recycler.recycle_copy_jobs(), 0); |
| |
| // check object files |
| std::vector<std::tuple<std::shared_ptr<StorageVaultAccessor>, std::string, int>> |
| prefix_and_files_list; |
| prefix_and_files_list.emplace_back(internal_accessor, "0/", 0); |
| prefix_and_files_list.emplace_back(internal_accessor, "5/", 10); |
| prefix_and_files_list.emplace_back(internal_accessor, "6/", 10); |
| prefix_and_files_list.emplace_back(internal_accessor, "8/", 10); |
| for (const auto& [accessor, relative_path, file_num] : prefix_and_files_list) { |
| std::unique_ptr<ListIterator> list_iter; |
| ASSERT_EQ(0, accessor->list_directory(relative_path, &list_iter)); |
| int cnt = 0; |
| while (list_iter->next().has_value()) { |
| ++cnt; |
| } |
| ASSERT_EQ(file_num, cnt) << relative_path; |
| } |
| |
| // check fdb kvs |
| // <stage_id, table_id, expected_files, expected_job_exists> |
| std::vector<std::tuple<std::string, int, int, bool>> stage_table_files; |
| stage_table_files.emplace_back(internal_stage_id, 0, 0, false); |
| stage_table_files.emplace_back(nonexist_internal_stage_id, 8, 0, false); |
| stage_table_files.emplace_back(internal_stage_id, 5, 0, false); |
| stage_table_files.emplace_back(internal_stage_id, 6, 10, true); |
| stage_table_files.emplace_back(external_stage_id, 1, 10, true); |
| stage_table_files.emplace_back(external_stage_id, 2, 0, false); |
| stage_table_files.emplace_back(external_stage_id, 9, 10, true); |
| stage_table_files.emplace_back(external_stage_id, 10, 0, false); |
| stage_table_files.emplace_back(external_stage_id, 3, 0, false); |
| stage_table_files.emplace_back(external_stage_id, 4, 10, true); |
| stage_table_files.emplace_back(nonexist_external_stage_id, 7, 0, false); |
| for (const auto& [stage_id, table_id, files, expected_job_exists] : stage_table_files) { |
| // check copy files |
| int file_num = 0; |
| ASSERT_EQ(0, get_copy_file_num(txn_kv.get(), stage_id, table_id, &file_num)) << table_id; |
| EXPECT_EQ(files, file_num) << table_id; |
| // check copy jobs |
| bool exist = false; |
| ASSERT_EQ(0, copy_job_exists(txn_kv.get(), stage_id, table_id, &exist)) << table_id; |
| EXPECT_EQ(expected_job_exists, exist) << table_id; |
| } |
| } |
| |
| TEST(RecyclerTest, recycle_batch_copy_jobs) { |
| auto sp = SyncPoint::get_instance(); |
| std::unique_ptr<int, std::function<void(int*)>> defer( |
| (int*)0x01, [](int*) { SyncPoint::get_instance()->clear_all_call_backs(); }); |
| sp->set_call_back("MockAccessor::delete_files", [](auto&& args) { |
| auto* ret = try_any_cast_ret<int>(args); |
| ret->first = -1; |
| ret->second = true; |
| }); |
| sp->enable_processing(); |
| using namespace std::chrono; |
| auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| ASSERT_NE(txn_kv.get(), nullptr); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| // create internal/external stage |
| std::string internal_stage_id = "internal"; |
| std::string external_stage_id = "external"; |
| std::string nonexist_internal_stage_id = "non_exist_internal"; |
| std::string nonexist_external_stage_id = "non_exist_external"; |
| |
| InstanceInfoPB instance_info; |
| create_instance(internal_stage_id, external_stage_id, instance_info); |
| InstanceRecycler recycler(txn_kv, instance_info, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| const auto& internal_accessor = recycler.accessor_map_.find(internal_stage_id)->second; |
| |
| // create internal stage copy job with finish status |
| { |
| std::vector<ObjectFilePB> object_files; |
| create_object_file_pb("0", &object_files, 1000); |
| ASSERT_EQ(create_object_files(internal_accessor.get(), &object_files), 0); |
| create_copy_job(txn_kv.get(), internal_stage_id, 0, StagePB::INTERNAL, CopyJobPB::FINISH, |
| object_files, 0); |
| } |
| { |
| std::vector<ObjectFilePB> object_files; |
| create_object_file_pb("4", &object_files); |
| ASSERT_EQ(create_object_files(internal_accessor.get(), &object_files), 0); |
| create_copy_job(txn_kv.get(), internal_stage_id, 4, StagePB::INTERNAL, CopyJobPB::FINISH, |
| object_files, 0); |
| } |
| // create internal stage copy job with deleted stage id |
| { |
| std::vector<ObjectFilePB> object_files; |
| create_object_file_pb("8", &object_files); |
| ASSERT_EQ(create_object_files(internal_accessor.get(), &object_files), 0); |
| ASSERT_EQ(0, create_copy_job(txn_kv.get(), nonexist_internal_stage_id, 8, StagePB::INTERNAL, |
| CopyJobPB::FINISH, object_files, 0)); |
| } |
| |
| ASSERT_EQ(recycler.recycle_copy_jobs(), 0); |
| |
| // check object files |
| std::vector<std::tuple<std::shared_ptr<StorageVaultAccessor>, std::string, int>> |
| prefix_and_files_list; |
| prefix_and_files_list.emplace_back(internal_accessor, "0/", 1000); |
| prefix_and_files_list.emplace_back(internal_accessor, "4/", 10); |
| prefix_and_files_list.emplace_back(internal_accessor, "8/", 10); |
| for (const auto& [accessor, relative_path, file_num] : prefix_and_files_list) { |
| std::unique_ptr<ListIterator> list_iter; |
| ASSERT_EQ(0, accessor->list_directory(relative_path, &list_iter)); |
| int cnt = 0; |
| while (list_iter->next().has_value()) { |
| ++cnt; |
| } |
| ASSERT_EQ(file_num, cnt); |
| } |
| |
| // check fdb kvs |
| // <stage_id, table_id, expected_files, expected_job_exists> |
| std::vector<std::tuple<std::string, int, int, bool>> stage_table_files; |
| stage_table_files.emplace_back(internal_stage_id, 0, 1000, true); |
| stage_table_files.emplace_back(internal_stage_id, 4, 10, true); |
| stage_table_files.emplace_back(nonexist_internal_stage_id, 8, 0, false); |
| for (const auto& [stage_id, table_id, files, expected_job_exists] : stage_table_files) { |
| // check copy files |
| int file_num = 0; |
| ASSERT_EQ(0, get_copy_file_num(txn_kv.get(), stage_id, table_id, &file_num)) << table_id; |
| EXPECT_EQ(files, file_num) << table_id; |
| // check copy jobs |
| bool exist = false; |
| ASSERT_EQ(0, copy_job_exists(txn_kv.get(), stage_id, table_id, &exist)) << table_id; |
| EXPECT_EQ(expected_job_exists, exist) << table_id; |
| } |
| |
| sp->clear_call_back("MockAccessor::delete_files"); |
| ASSERT_EQ(recycler.recycle_copy_jobs(), 0); |
| |
| // check object files |
| prefix_and_files_list.clear(); |
| prefix_and_files_list.emplace_back(internal_accessor, "0/", 0); |
| prefix_and_files_list.emplace_back(internal_accessor, "4/", 0); |
| prefix_and_files_list.emplace_back(internal_accessor, "8/", 10); |
| for (const auto& [accessor, relative_path, file_num] : prefix_and_files_list) { |
| std::unique_ptr<ListIterator> list_iter; |
| ASSERT_EQ(0, accessor->list_directory(relative_path, &list_iter)); |
| int cnt = 0; |
| while (list_iter->next().has_value()) { |
| ++cnt; |
| } |
| ASSERT_EQ(file_num, cnt); |
| } |
| |
| // check fdb kvs |
| // <stage_id, table_id, expected_files, expected_job_exists> |
| stage_table_files.clear(); |
| stage_table_files.emplace_back(internal_stage_id, 0, 0, false); |
| stage_table_files.emplace_back(internal_stage_id, 4, 0, false); |
| stage_table_files.emplace_back(nonexist_internal_stage_id, 8, 0, false); |
| for (const auto& [stage_id, table_id, files, expected_job_exists] : stage_table_files) { |
| // check copy files |
| int file_num = 0; |
| ASSERT_EQ(0, get_copy_file_num(txn_kv.get(), stage_id, table_id, &file_num)) << table_id; |
| EXPECT_EQ(files, file_num) << table_id; |
| // check copy jobs |
| bool exist = false; |
| ASSERT_EQ(0, copy_job_exists(txn_kv.get(), stage_id, table_id, &exist)) << table_id; |
| EXPECT_EQ(expected_job_exists, exist) << table_id; |
| } |
| } |
| |
| TEST(RecyclerTest, recycle_stage) { |
| [[maybe_unused]] auto sp = SyncPoint::get_instance(); |
| std::unique_ptr<int, std::function<void(int*)>> defer( |
| (int*)0x01, [](int*) { SyncPoint::get_instance()->clear_all_call_backs(); }); |
| auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| ASSERT_NE(txn_kv.get(), nullptr); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| std::string stage_prefix = "prefix/stage/bob/bc9fff5e-5f91-4168-8eaa-0afd6667f7ef"; |
| ObjectStoreInfoPB object_info; |
| object_info.set_id("obj_id"); |
| InstanceInfoPB instance; |
| instance.set_instance_id(mock_instance); |
| instance.add_obj_info()->CopyFrom(object_info); |
| |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| auto accessor = recycler.accessor_map_.begin()->second; |
| for (int i = 0; i < 10; ++i) { |
| accessor->put_file(std::to_string(i) + ".csv", ""); |
| } |
| |
| SyncPoint::CallbackGuard guard; |
| sp->set_call_back( |
| "recycle_stage:get_accessor", |
| [&](auto&& args) { |
| *try_any_cast<std::shared_ptr<StorageVaultAccessor>*>(args[0]) = accessor; |
| auto* ret = try_any_cast_ret<int>(args); |
| ret->first = 0; |
| ret->second = true; |
| }, |
| &guard); |
| sp->enable_processing(); |
| |
| std::string key; |
| std::string val; |
| RecycleStageKeyInfo key_info {mock_instance, "stage_id"}; |
| recycle_stage_key(key_info, &key); |
| StagePB stage; |
| stage.add_mysql_user_name("user_name"); |
| stage.add_mysql_user_id("user_id"); |
| stage.mutable_obj_info()->set_id("1"); |
| stage.mutable_obj_info()->set_prefix(stage_prefix); |
| RecycleStagePB recycle_stage; |
| recycle_stage.set_instance_id(mock_instance); |
| recycle_stage.mutable_stage()->CopyFrom(stage); |
| val = recycle_stage.SerializeAsString(); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn)); |
| txn->put(key, val); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn)); |
| ASSERT_EQ(TxnErrorCode::TXN_OK, txn->get(key, &val)); |
| |
| // recycle stage |
| ASSERT_EQ(0, recycler.recycle_stage()); |
| std::unique_ptr<ListIterator> list_iter; |
| ASSERT_EQ(0, accessor->list_all(&list_iter)); |
| ASSERT_FALSE(list_iter->has_next()); |
| ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn)); |
| ASSERT_EQ(TxnErrorCode::TXN_KEY_NOT_FOUND, txn->get(key, &val)); |
| } |
| |
| TEST(RecyclerTest, recycle_deleted_instance) { |
| auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| ASSERT_NE(txn_kv.get(), nullptr); |
| ASSERT_EQ(txn_kv->init(), 0); |
| // create internal/external stage |
| std::string internal_stage_id = "internal"; |
| std::string external_stage_id = "external"; |
| std::string nonexist_internal_stage_id = "non_exist_internal"; |
| std::string nonexist_external_stage_id = "non_exist_external"; |
| |
| InstanceInfoPB instance_info; |
| create_instance(internal_stage_id, external_stage_id, instance_info); |
| InstanceRecycler recycler(txn_kv, instance_info, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| // create txn key |
| for (size_t i = 0; i < 100; i++) { |
| ASSERT_EQ(0, create_txn_label_kv(txn_kv.get(), fmt::format("fake_label{}", i), i)); |
| } |
| // create partition version key |
| for (size_t i = 101; i < 200; i += 2) { |
| ASSERT_EQ(0, create_partition_version_kv(txn_kv.get(), i, i + 1)); |
| } |
| // create table version key |
| for (size_t i = 101; i < 200; i += 2) { |
| ASSERT_EQ(0, create_table_version_kv(txn_kv.get(), i)); |
| } |
| // create meta key |
| std::vector<doris::TabletSchemaCloudPB> schemas; |
| for (int i = 0; i < 5; ++i) { |
| auto& schema = schemas.emplace_back(); |
| schema.set_schema_version(i); |
| for (int j = 0; j < i; ++j) { |
| auto index = schema.add_index(); |
| index->set_index_id(j); |
| index->set_index_type(IndexType::INVERTED); |
| } |
| } |
| |
| constexpr int table_id = 10000, index_id = 10001, partition_id = 10002; |
| auto accessor = recycler.accessor_map_.begin()->second; |
| int64_t tablet_id_base = 10100; |
| int64_t txn_id_base = 114115; |
| for (int i = 0; i < 100; ++i) { |
| int64_t tablet_id = tablet_id_base + i; |
| // creare stats key |
| create_tablet(txn_kv.get(), table_id, index_id, partition_id, tablet_id); |
| for (int j = 0; j < 10; ++j) { |
| auto rowset = create_rowset("recycle_tablet", tablet_id, index_id, 5, schemas[j % 5]); |
| // create recycle key |
| create_recycle_rowset(txn_kv.get(), accessor.get(), rowset, |
| j % 10 < 2 ? RecycleRowsetPB::PREPARE : RecycleRowsetPB::COMPACT, |
| j & 1); |
| auto tmp_rowset = create_rowset("recycle_tmp_rowsets", tablet_id, index_id, 5, |
| schemas[j % 5], txn_id_base + j); |
| // create meta key |
| create_tmp_rowset(txn_kv.get(), accessor.get(), tmp_rowset, j & 1); |
| } |
| for (int j = 0; j < 10; ++j) { |
| // create meta key |
| create_committed_rowset(txn_kv.get(), accessor.get(), "recycle_indexes", tablet_id, j); |
| } |
| } |
| |
| ASSERT_EQ(0, recycler.recycle_deleted_instance()); |
| |
| // check if all the objects are deleted |
| std::for_each(recycler.accessor_map_.begin(), recycler.accessor_map_.end(), |
| [&](const auto& entry) { |
| std::unique_ptr<ListIterator> list_iter; |
| auto& acc = entry.second; |
| ASSERT_EQ(0, acc->list_all(&list_iter)); |
| ASSERT_FALSE(list_iter->has_next()); |
| }); |
| |
| // check if all the keys are deleted |
| // check all related kv have been deleted |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::unique_ptr<RangeGetIterator> it; |
| |
| std::string start_txn_key = txn_key_prefix(instance_id); |
| std::string end_txn_key = txn_key_prefix(instance_id + '\x00'); |
| ASSERT_EQ(txn->get(start_txn_key, end_txn_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| |
| std::string start_partition_version_key = partition_version_key({instance_id, 0, 0, 0}); |
| std::string end_partition_version_key = partition_version_key({instance_id, INT64_MAX, 0, 0}); |
| ASSERT_EQ(txn->get(start_partition_version_key, end_partition_version_key, &it), |
| TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| |
| std::string start_table_version_key = table_version_key({instance_id, 0, 0}); |
| std::string end_table_version_key = table_version_key({instance_id, INT64_MAX, 0}); |
| ASSERT_EQ(txn->get(start_table_version_key, end_table_version_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| |
| std::string start_version_key = version_key_prefix(instance_id); |
| std::string end_version_key = version_key_prefix(instance_id + '\x00'); |
| ASSERT_EQ(txn->get(start_version_key, end_version_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| |
| std::string start_meta_key = meta_key_prefix(instance_id); |
| std::string end_meta_key = meta_key_prefix(instance_id + '\x00'); |
| ASSERT_EQ(txn->get(start_meta_key, end_meta_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| |
| auto begin_recycle_key = recycle_key_prefix(instance_id); |
| auto end_recycle_key = recycle_key_prefix(instance_id + '\x00'); |
| ASSERT_EQ(txn->get(begin_recycle_key, end_recycle_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| |
| std::string start_stats_tablet_key = stats_tablet_key({instance_id, 0, 0, 0, 0}); |
| std::string end_stats_tablet_key = stats_tablet_key({instance_id, INT64_MAX, 0, 0, 0}); |
| ASSERT_EQ(txn->get(start_stats_tablet_key, end_stats_tablet_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| |
| std::string start_copy_key = copy_key_prefix(instance_id); |
| std::string end_copy_key = copy_key_prefix(instance_id + '\x00'); |
| ASSERT_EQ(txn->get(start_copy_key, end_copy_key, &it), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(it->size(), 0); |
| } |
| |
| TEST(RecyclerTest, multi_recycler) { |
| config::recycle_concurrency = 2; |
| config::recycle_interval_seconds = 10; |
| config::recycle_job_lease_expired_ms = 1000; |
| auto mem_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(mem_kv->init(), 0); |
| |
| std::atomic_int count {0}; |
| auto sp = SyncPoint::get_instance(); |
| |
| SyncPoint::CallbackGuard guard; |
| sp->set_call_back( |
| "InstanceRecycler.do_recycle", |
| [&count](auto&&) { |
| sleep(1); |
| ++count; |
| }, |
| &guard); |
| sp->enable_processing(); |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(TxnErrorCode::TXN_OK, mem_kv->create_txn(&txn)); |
| for (int i = 0; i < 10; ++i) { |
| InstanceInfoPB instance; |
| instance.set_instance_id(std::to_string(i)); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("multi_recycler_test"); |
| obj_info->set_ak(config::test_s3_ak); |
| obj_info->set_sk(config::test_s3_sk); |
| obj_info->set_endpoint(config::test_s3_endpoint); |
| obj_info->set_region(config::test_s3_region); |
| obj_info->set_bucket(config::test_s3_bucket); |
| obj_info->set_prefix("multi_recycler_test"); |
| InstanceKeyInfo key_info {std::to_string(i)}; |
| std::string key; |
| instance_key(key_info, &key); |
| std::string val = instance.SerializeAsString(); |
| txn->put(key, val); |
| } |
| ASSERT_EQ(TxnErrorCode::TXN_OK, txn->commit()); |
| |
| Recycler r1(mem_kv); |
| r1.ip_port_ = "r1:p1"; |
| r1.start(nullptr); |
| Recycler r2(mem_kv); |
| r2.ip_port_ = "r2:p2"; |
| r2.start(nullptr); |
| Recycler r3(mem_kv); |
| r3.ip_port_ = "r3:p3"; |
| std::this_thread::sleep_for(std::chrono::milliseconds(500)); |
| r3.start(nullptr); |
| |
| std::this_thread::sleep_for(std::chrono::seconds(5)); |
| r1.stop(); |
| r2.stop(); |
| r3.stop(); |
| |
| ASSERT_EQ(TxnErrorCode::TXN_OK, mem_kv->create_txn(&txn)); |
| for (int i = 0; i < 10; ++i) { |
| JobRecycleKeyInfo key_info {std::to_string(i)}; |
| JobRecyclePB job_info; |
| std::string key; |
| std::string val; |
| job_recycle_key(key_info, &key); |
| ASSERT_EQ(TxnErrorCode::TXN_OK, txn->get(key, &val)) << i; |
| ASSERT_TRUE(job_info.ParseFromString(val)); |
| EXPECT_EQ(JobRecyclePB::IDLE, job_info.status()); |
| EXPECT_GT(job_info.last_finish_time_ms(), 0); |
| std::cout << "host: " << job_info.ip_port() << " finish recycle job of instance_id: " << i |
| << std::endl; |
| } |
| EXPECT_EQ(count, 10); |
| } |
| |
| TEST(RecyclerTest, safe_exit) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| auto checker_ = std::make_unique<Recycler>(txn_kv); |
| brpc::Server server; |
| int ret = checker_->start(&server); |
| ASSERT_TRUE(ret == 0); |
| checker_->stop(); |
| } |
| |
| TEST(CheckerTest, safe_exit) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| auto checker_ = std::make_unique<Checker>(txn_kv); |
| int ret = checker_->start(); |
| ASSERT_TRUE(ret == 0); |
| checker_->stop(); |
| } |
| |
| TEST(CheckerTest, normal_inverted_check) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("1"); |
| |
| auto sp = SyncPoint::get_instance(); |
| SyncPoint::CallbackGuard guard; |
| sp->set_call_back( |
| "InstanceChecker::do_inverted_check", |
| [](auto&& args) { |
| auto* ret = try_any_cast_ret<int>(args); |
| ret->first = 0; |
| ret->second = true; |
| }, |
| &guard); |
| sp->enable_processing(); |
| std::unique_ptr<int, std::function<void(int*)>> defer( |
| (int*)0x01, [](int*) { SyncPoint::get_instance()->disable_processing(); }); |
| |
| InstanceChecker checker(txn_kv, instance_id); |
| ASSERT_EQ(checker.init(instance), 0); |
| // Add some visible rowsets along with some rowsets that should be recycled |
| // call inverted check after do recycle which would sweep all the rowsets not visible |
| auto accessor = checker.accessor_map_.begin()->second; |
| for (int t = 10001; t <= 10100; ++t) { |
| for (int v = 0; v < 10; ++v) { |
| create_committed_rowset(txn_kv.get(), accessor.get(), "1", t, v, 1); |
| } |
| } |
| for (int t = 10101; t <= 10200; ++t) { |
| for (int v = 0; v < 10; ++v) { |
| create_committed_rowset(txn_kv.get(), accessor.get(), "1", t, v, 5); |
| } |
| } |
| ASSERT_EQ(checker.do_inverted_check(), 0); |
| } |
| |
| // TODO(Xiaocc): We need one mocked accessor which provides one async stream like list function |
| // to do the following test |
| TEST(CheckerTest, DISABLED_abnormal_inverted_check) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("1"); |
| obj_info->set_ak(config::test_s3_ak); |
| obj_info->set_sk(config::test_s3_sk); |
| obj_info->set_endpoint(config::test_s3_endpoint); |
| obj_info->set_region(config::test_s3_region); |
| obj_info->set_bucket(config::test_s3_bucket); |
| obj_info->set_prefix("CheckerTest"); |
| |
| auto sp = SyncPoint::get_instance(); |
| SyncPoint::CallbackGuard guard; |
| sp->set_call_back( |
| "InstanceChecker::do_inverted_check", |
| [](auto&& args) { |
| auto* ret = try_any_cast_ret<int>(args); |
| ret->first = 0; |
| ret->second = true; |
| }, |
| &guard); |
| sp->enable_processing(); |
| std::unique_ptr<int, std::function<void(int*)>> defer( |
| (int*)0x01, [](int*) { SyncPoint::get_instance()->disable_processing(); }); |
| |
| InstanceChecker checker(txn_kv, instance_id); |
| ASSERT_EQ(checker.init(instance), 0); |
| // Add some visible rowsets along with some rowsets that should be recycled |
| // call inverted check after do recycle which would sweep all the rowsets not visible |
| auto accessor = checker.accessor_map_.begin()->second; |
| for (int t = 10001; t <= 10100; ++t) { |
| for (int v = 0; v < 10; ++v) { |
| create_committed_rowset(txn_kv.get(), accessor.get(), "1", t, v, 1); |
| } |
| } |
| std::vector<doris::TabletSchemaCloudPB> schemas; |
| for (int i = 0; i < 5; ++i) { |
| auto& schema = schemas.emplace_back(); |
| schema.set_schema_version(i); |
| for (int j = 0; j < i; ++j) { |
| auto index = schema.add_index(); |
| index->set_index_id(j); |
| index->set_index_type(IndexType::INVERTED); |
| } |
| } |
| |
| // Create some rowsets not visible in S3 |
| constexpr int table_id = 10101, index_id = 10102, partition_id = 10103, tablet_id = 10104; |
| create_tablet(txn_kv.get(), table_id, index_id, partition_id, tablet_id); |
| for (int i = 0; i < 500; ++i) { |
| auto rowset = create_rowset("recycle_tablet", tablet_id, index_id, 5, schemas[i % 5]); |
| create_recycle_rowset(txn_kv.get(), accessor.get(), rowset, |
| i % 10 < 2 ? RecycleRowsetPB::PREPARE : RecycleRowsetPB::COMPACT, |
| i & 1); |
| } |
| ASSERT_NE(checker.do_inverted_check(), 0); |
| } |
| |
| TEST(CheckerTest, normal) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("1"); |
| InstanceChecker checker(txn_kv, instance_id); |
| ASSERT_EQ(checker.init(instance), 0); |
| |
| auto accessor = checker.accessor_map_.begin()->second; |
| for (int t = 10001; t <= 10100; ++t) { |
| for (int v = 0; v < 10; ++v) { |
| create_committed_rowset(txn_kv.get(), accessor.get(), "1", t, v, 1); |
| } |
| } |
| for (int t = 10101; t <= 10200; ++t) { |
| for (int v = 0; v < 10; ++v) { |
| create_committed_rowset(txn_kv.get(), accessor.get(), "1", t, v, 5); |
| } |
| } |
| ASSERT_EQ(checker.do_check(), 0); |
| } |
| |
| TEST(CheckerTest, abnormal) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("1"); |
| |
| InstanceChecker checker(txn_kv, instance_id); |
| ASSERT_EQ(checker.init(instance), 0); |
| |
| auto accessor = checker.accessor_map_.begin()->second; |
| for (int t = 10001; t <= 10100; ++t) { |
| for (int v = 0; v < 10; ++v) { |
| create_committed_rowset(txn_kv.get(), accessor.get(), "1", t, v, 1, 0); |
| } |
| } |
| for (int t = 10101; t <= 10200; ++t) { |
| for (int v = 0; v < 10; ++v) { |
| create_committed_rowset(txn_kv.get(), accessor.get(), "1", t, v, 5, 0); |
| } |
| } |
| |
| // Delete some objects |
| std::mt19937 gen(std::chrono::system_clock::now().time_since_epoch().count()); |
| std::vector<std::string> deleted_paths; |
| std::unique_ptr<ListIterator> list_iter; |
| ASSERT_EQ(0, accessor->list_directory(tablet_path_prefix(10001 + gen() % 100), &list_iter)); |
| auto file = list_iter->next(); |
| deleted_paths.push_back(file->path); |
| for (auto file = list_iter->next(); file.has_value(); file = list_iter->next()) { |
| if (gen() % 10 < 2) { |
| deleted_paths.push_back(std::move(file->path)); |
| } |
| } |
| ASSERT_EQ(0, accessor->list_directory(tablet_path_prefix(10101 + gen() % 100), &list_iter)); |
| for (auto file = list_iter->next(); file.has_value(); file = list_iter->next()) { |
| if (gen() % 10 < 2) { |
| deleted_paths.push_back(std::move(file->path)); |
| } |
| } |
| |
| ASSERT_EQ(0, accessor->delete_files(deleted_paths)); |
| |
| std::vector<std::string> lost_paths; |
| auto sp = SyncPoint::get_instance(); |
| SyncPoint::CallbackGuard guard; |
| sp->set_call_back( |
| "InstanceChecker.do_check1", |
| [&lost_paths](auto&& args) { |
| lost_paths.push_back(*try_any_cast<std::string*>(args[0])); |
| }, |
| &guard); |
| sp->enable_processing(); |
| |
| ASSERT_NE(checker.do_check(), 0); |
| EXPECT_EQ(deleted_paths, lost_paths); |
| } |
| |
| TEST(CheckerTest, multi_checker) { |
| config::recycle_concurrency = 2; |
| config::scan_instances_interval_seconds = 10; |
| config::recycle_job_lease_expired_ms = 1000; |
| auto mem_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(mem_kv->init(), 0); |
| |
| std::atomic_int count {0}; |
| auto sp = SyncPoint::get_instance(); |
| SyncPoint::CallbackGuard guard; |
| sp->set_call_back( |
| "InstanceChecker.do_check", |
| [&count](auto&&) { |
| sleep(1); |
| ++count; |
| }, |
| &guard); |
| sp->enable_processing(); |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(TxnErrorCode::TXN_OK, mem_kv->create_txn(&txn)); |
| for (int i = 0; i < 10; ++i) { |
| InstanceInfoPB instance; |
| instance.set_instance_id(std::to_string(i)); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("1"); |
| InstanceKeyInfo key_info {std::to_string(i)}; |
| std::string key; |
| instance_key(key_info, &key); |
| std::string val = instance.SerializeAsString(); |
| txn->put(key, val); |
| } |
| ASSERT_EQ(TxnErrorCode::TXN_OK, txn->commit()); |
| |
| Checker c1(mem_kv); |
| c1.ip_port_ = "r1:p1"; |
| c1.start(); |
| Checker c2(mem_kv); |
| c2.ip_port_ = "r2:p2"; |
| c2.start(); |
| Checker c3(mem_kv); |
| c3.ip_port_ = "r3:p3"; |
| std::this_thread::sleep_for(std::chrono::milliseconds(500)); |
| c3.start(); |
| |
| std::this_thread::sleep_for(std::chrono::seconds(5)); |
| c1.stop(); |
| c2.stop(); |
| c3.stop(); |
| |
| ASSERT_EQ(TxnErrorCode::TXN_OK, mem_kv->create_txn(&txn)); |
| for (int i = 0; i < 10; ++i) { |
| JobRecycleKeyInfo key_info {std::to_string(i)}; |
| JobRecyclePB job_info; |
| std::string key; |
| std::string val; |
| job_check_key(key_info, &key); |
| ASSERT_EQ(TxnErrorCode::TXN_OK, txn->get(key, &val)) << i; |
| ASSERT_TRUE(job_info.ParseFromString(val)); |
| EXPECT_EQ(JobRecyclePB::IDLE, job_info.status()); |
| EXPECT_GT(job_info.last_finish_time_ms(), 0); |
| std::cout << "host: " << job_info.ip_port() << " finish check job of instance_id: " << i |
| << std::endl; |
| } |
| EXPECT_EQ(count, 10); |
| } |
| |
| TEST(CheckerTest, do_inspect) { |
| using namespace std::chrono; |
| { |
| auto mem_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(mem_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| instance.set_ctime(11111); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("1"); |
| |
| Checker checker(mem_kv); |
| checker.do_inspect(instance); |
| |
| { |
| // empty job info |
| auto sp = SyncPoint::get_instance(); |
| std::unique_ptr<int, std::function<void(int*)>> defer( |
| (int*)0x01, [](int*) { SyncPoint::get_instance()->clear_all_call_backs(); }); |
| sp->set_call_back("Checker:do_inspect", [](auto&& args) { |
| auto last_ctime = *try_any_cast<int64_t*>(args[0]); |
| ASSERT_EQ(last_ctime, 11111); |
| std::cout << "last_ctime: " << last_ctime << std::endl; |
| }); |
| sp->enable_processing(); |
| } |
| |
| { |
| // add job_info but no last ctime |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(TxnErrorCode::TXN_OK, mem_kv->create_txn(&txn)); |
| JobRecyclePB job_info; |
| job_info.set_instance_id(instance_id); |
| std::string key = job_check_key({instance_id}); |
| std::string val = job_info.SerializeAsString(); |
| txn->put(key, val); |
| ASSERT_EQ(TxnErrorCode::TXN_OK, txn->commit()); |
| checker.do_inspect(instance); |
| auto sp = SyncPoint::get_instance(); |
| std::unique_ptr<int, std::function<void(int*)>> defer( |
| (int*)0x01, [](int*) { SyncPoint::get_instance()->clear_all_call_backs(); }); |
| sp->set_call_back("Checker:do_inspect", [](auto&& args) { |
| ASSERT_EQ(*try_any_cast<int64_t*>(args[0]), 11111); |
| }); |
| sp->enable_processing(); |
| } |
| |
| { |
| // add job_info with last ctime |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(TxnErrorCode::TXN_OK, mem_kv->create_txn(&txn)); |
| JobRecyclePB job_info; |
| job_info.set_instance_id(instance_id); |
| job_info.set_last_ctime_ms(12345); |
| auto sp = SyncPoint::get_instance(); |
| std::unique_ptr<int, std::function<void(int*)>> defer( |
| (int*)0x01, [](int*) { SyncPoint::get_instance()->clear_all_call_backs(); }); |
| sp->set_call_back("Checker:do_inspect", [](auto&& args) { |
| ASSERT_EQ(*try_any_cast<int64_t*>(args[0]), 12345); |
| }); |
| sp->enable_processing(); |
| std::string key = job_check_key({instance_id}); |
| std::string val = job_info.SerializeAsString(); |
| txn->put(key, val); |
| ASSERT_EQ(TxnErrorCode::TXN_OK, txn->commit()); |
| checker.do_inspect(instance); |
| } |
| { |
| // alarm |
| int64_t expiration_ms = 7 > config::reserved_buffer_days |
| ? (7 - config::reserved_buffer_days) * 3600000 |
| : 7 * 3600000; |
| auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(TxnErrorCode::TXN_OK, mem_kv->create_txn(&txn)); |
| JobRecyclePB job_info; |
| job_info.set_instance_id(instance_id); |
| job_info.set_last_ctime_ms(now - expiration_ms - 10); |
| auto sp = SyncPoint::get_instance(); |
| std::unique_ptr<int, std::function<void(int*)>> defer( |
| (int*)0x01, [](int*) { SyncPoint::get_instance()->clear_all_call_backs(); }); |
| |
| bool alarm = false; |
| sp->set_call_back("Checker:do_inspect", [&alarm](auto&&) { alarm = true; }); |
| sp->enable_processing(); |
| std::string key = job_check_key({instance_id}); |
| std::string val = job_info.SerializeAsString(); |
| txn->put(key, val); |
| ASSERT_EQ(TxnErrorCode::TXN_OK, txn->commit()); |
| checker.do_inspect(instance); |
| // FIXME(plat1ko): Unify SyncPoint in be and cloud |
| //ASSERT_TRUE(alarm); |
| } |
| } |
| } |
| |
| TEST(CheckerTest, delete_bitmap_inverted_check_normal) { |
| // normal case, all delete bitmaps belong to a rowset |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("1"); |
| |
| InstanceChecker checker(txn_kv, instance_id); |
| ASSERT_EQ(checker.init(instance), 0); |
| auto accessor = checker.accessor_map_.begin()->second; |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn)); |
| |
| constexpr int table_id = 10000, index_id = 10001, partition_id = 10002; |
| // create some rowsets with delete bitmaps in merge-on-write tablet |
| for (int tablet_id = 600001; tablet_id <= 600010; ++tablet_id) { |
| ASSERT_EQ(0, |
| create_tablet(txn_kv.get(), table_id, index_id, partition_id, tablet_id, true)); |
| int64_t rowset_start_id = 400; |
| for (int ver = 2; ver <= 10; ++ver) { |
| std::string rowset_id = std::to_string(rowset_start_id++); |
| create_committed_rowset_with_rowset_id(txn_kv.get(), accessor.get(), "1", tablet_id, |
| ver, ver, rowset_id, false, 1); |
| if (ver >= 5) { |
| auto delete_bitmap_key = |
| meta_delete_bitmap_key({instance_id, tablet_id, rowset_id, ver, 0}); |
| std::string delete_bitmap_val {"test"}; |
| txn->put(delete_bitmap_key, delete_bitmap_val); |
| } else { |
| // delete bitmaps may be spilitted into mulitiple KVs if too large |
| auto delete_bitmap_key = |
| meta_delete_bitmap_key({instance_id, tablet_id, rowset_id, ver, 0}); |
| std::string delete_bitmap_val(1000, 'A'); |
| cloud::put(txn.get(), delete_bitmap_key, delete_bitmap_val, 0, 300); |
| } |
| } |
| } |
| |
| // also create some rowsets without delete bitmaps in non merge-on-write tablet |
| for (int tablet_id = 700001; tablet_id <= 700010; ++tablet_id) { |
| ASSERT_EQ(0, |
| create_tablet(txn_kv.get(), table_id, index_id, partition_id, tablet_id, false)); |
| int64_t rowset_start_id = 500; |
| for (int ver = 2; ver < 10; ++ver) { |
| std::string rowset_id = std::to_string(rowset_start_id++); |
| create_committed_rowset_with_rowset_id(txn_kv.get(), accessor.get(), "1", tablet_id, |
| ver, ver, rowset_id, false, 1); |
| } |
| } |
| |
| ASSERT_EQ(TxnErrorCode::TXN_OK, txn->commit()); |
| |
| ASSERT_EQ(checker.do_delete_bitmap_inverted_check(), 0); |
| } |
| |
| TEST(CheckerTest, delete_bitmap_inverted_check_abnormal) { |
| // abnormal case, some delete bitmaps arem leaked |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("1"); |
| |
| InstanceChecker checker(txn_kv, instance_id); |
| ASSERT_EQ(checker.init(instance), 0); |
| auto accessor = checker.accessor_map_.begin()->second; |
| |
| // tablet_id -> [rowset_id, version, segment_id] |
| std::map<std::int64_t, std::set<std::tuple<std::string, int64_t, int64_t>>> |
| expected_abnormal_delete_bitmaps {}, real_abnormal_delete_bitmaps {}; |
| std::map<std::int64_t, std::set<std::tuple<std::string, int64_t, int64_t>>> |
| expected_leaked_delete_bitmaps {}, real_leaked_delete_bitmaps {}; |
| auto sp = SyncPoint::get_instance(); |
| std::unique_ptr<int, std::function<void(int*)>> defer( |
| (int*)0x01, [](int*) { SyncPoint::get_instance()->clear_all_call_backs(); }); |
| sp->set_call_back( |
| "InstanceChecker::do_delete_bitmap_inverted_check.get_abnormal_delete_bitmap", |
| [&real_abnormal_delete_bitmaps](auto&& args) { |
| int64_t tablet_id = *try_any_cast<int64_t*>(args[0]); |
| std::string rowset_id = *try_any_cast<std::string*>(args[1]); |
| int64_t version = *try_any_cast<int64_t*>(args[2]); |
| int64_t segment_id = *try_any_cast<int64_t*>(args[3]); |
| real_abnormal_delete_bitmaps[tablet_id].insert({rowset_id, version, segment_id}); |
| }); |
| sp->set_call_back( |
| "InstanceChecker::do_delete_bitmap_inverted_check.get_leaked_delete_bitmap", |
| [&real_leaked_delete_bitmaps](auto&& args) { |
| int64_t tablet_id = *try_any_cast<int64_t*>(args[0]); |
| std::string rowset_id = *try_any_cast<std::string*>(args[1]); |
| int64_t version = *try_any_cast<int64_t*>(args[2]); |
| int64_t segment_id = *try_any_cast<int64_t*>(args[3]); |
| real_leaked_delete_bitmaps[tablet_id].insert({rowset_id, version, segment_id}); |
| }); |
| sp->enable_processing(); |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn)); |
| |
| constexpr int table_id = 10000, index_id = 10001, partition_id = 10002; |
| // create some rowsets with delete bitmaps in merge-on-write tablet |
| for (int tablet_id = 800001; tablet_id <= 800010; ++tablet_id) { |
| ASSERT_EQ(0, |
| create_tablet(txn_kv.get(), table_id, index_id, partition_id, tablet_id, true)); |
| int64_t rowset_start_id = 600; |
| for (int ver = 2; ver <= 20; ++ver) { |
| std::string rowset_id = std::to_string(rowset_start_id++); |
| |
| if (ver >= 10) { |
| // only create rowsets for some versions |
| create_committed_rowset_with_rowset_id(txn_kv.get(), accessor.get(), "1", tablet_id, |
| ver, ver, rowset_id, false, 1); |
| } else { |
| expected_leaked_delete_bitmaps[tablet_id].insert({rowset_id, ver, 0}); |
| } |
| |
| if (ver >= 5) { |
| auto delete_bitmap_key = |
| meta_delete_bitmap_key({instance_id, tablet_id, rowset_id, ver, 0}); |
| std::string delete_bitmap_val {"test"}; |
| txn->put(delete_bitmap_key, delete_bitmap_val); |
| } else { |
| // delete bitmaps may be spilitted into mulitiple KVs if too large |
| auto delete_bitmap_key = |
| meta_delete_bitmap_key({instance_id, tablet_id, rowset_id, ver, 0}); |
| std::string delete_bitmap_val(1000, 'A'); |
| cloud::put(txn.get(), delete_bitmap_key, delete_bitmap_val, 0, 300); |
| } |
| } |
| } |
| |
| // create some rowsets with delete bitmaps in non merge-on-write tablet |
| for (int tablet_id = 900001; tablet_id <= 900010; ++tablet_id) { |
| ASSERT_EQ(0, |
| create_tablet(txn_kv.get(), table_id, index_id, partition_id, tablet_id, false)); |
| int64_t rowset_start_id = 700; |
| for (int ver = 2; ver < 6; ++ver) { |
| std::string rowset_id = std::to_string(rowset_start_id++); |
| create_committed_rowset_with_rowset_id(txn_kv.get(), accessor.get(), "1", tablet_id, |
| ver, ver, rowset_id, false, 1); |
| auto delete_bitmap_key = |
| meta_delete_bitmap_key({instance_id, tablet_id, rowset_id, ver, 0}); |
| std::string delete_bitmap_val {"test2"}; |
| txn->put(delete_bitmap_key, delete_bitmap_val); |
| |
| expected_abnormal_delete_bitmaps[tablet_id].insert({rowset_id, ver, 0}); |
| } |
| } |
| |
| // create some rowsets without delete bitmaps in non merge-on-write tablet |
| for (int tablet_id = 700001; tablet_id <= 700010; ++tablet_id) { |
| ASSERT_EQ(0, |
| create_tablet(txn_kv.get(), table_id, index_id, partition_id, tablet_id, false)); |
| int64_t rowset_start_id = 500; |
| for (int ver = 2; ver < 10; ++ver) { |
| std::string rowset_id = std::to_string(rowset_start_id++); |
| create_committed_rowset_with_rowset_id(txn_kv.get(), accessor.get(), "1", tablet_id, |
| ver, ver, rowset_id, false, 1); |
| } |
| } |
| |
| ASSERT_EQ(TxnErrorCode::TXN_OK, txn->commit()); |
| |
| ASSERT_EQ(checker.do_delete_bitmap_inverted_check(), 1); |
| ASSERT_EQ(expected_leaked_delete_bitmaps, real_leaked_delete_bitmaps); |
| ASSERT_EQ(expected_abnormal_delete_bitmaps, real_abnormal_delete_bitmaps); |
| } |
| |
| TEST(CheckerTest, delete_bitmap_storage_optimize_check_normal) { |
| config::delete_bitmap_storage_optimize_check_version_gap = 0; |
| |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("1"); |
| |
| InstanceChecker checker(txn_kv, instance_id); |
| ASSERT_EQ(checker.init(instance), 0); |
| auto accessor = checker.accessor_map_.begin()->second; |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn)); |
| |
| constexpr int table_id = 10000, index_id = 10001, partition_id = 10002; |
| int64_t rowset_start_id = 600; |
| |
| for (int tablet_id = 800001; tablet_id <= 800005; ++tablet_id) { |
| ASSERT_EQ(0, |
| create_tablet(txn_kv.get(), table_id, index_id, partition_id, tablet_id, true)); |
| std::vector<std::pair<int64_t, int64_t>> rowset_vers {{2, 2}, {3, 3}, {4, 4}, {5, 5}, |
| {6, 7}, {8, 8}, {9, 9}}; |
| std::vector<std::pair<int64_t, int64_t>> delete_bitmaps_vers { |
| {7, 9}, {8, 9}, {7, 9}, {7, 9}, {7, 9}, {8, 9}, {9, 9}}; |
| std::vector<bool> segments_overlap {true, true, true, true, false, true, true}; |
| for (size_t i {0}; i < 7; i++) { |
| std::string rowset_id = std::to_string(rowset_start_id++); |
| create_committed_rowset_with_rowset_id(txn_kv.get(), accessor.get(), "1", tablet_id, |
| rowset_vers[i].first, rowset_vers[i].second, |
| rowset_id, segments_overlap[i], 1); |
| create_delete_bitmaps(txn.get(), tablet_id, rowset_id, delete_bitmaps_vers[i].first, |
| delete_bitmaps_vers[i].second); |
| } |
| } |
| |
| for (int tablet_id = 800006; tablet_id <= 800010; ++tablet_id) { |
| // [7-7] cumu compaction output rowset start_version == end_version |
| ASSERT_EQ(0, |
| create_tablet(txn_kv.get(), table_id, index_id, partition_id, tablet_id, true)); |
| std::vector<std::pair<int64_t, int64_t>> rowset_vers {{2, 2}, {3, 3}, {4, 4}, {5, 5}, |
| {6, 6}, {7, 7}, {8, 8}, {9, 9}}; |
| std::vector<std::pair<int64_t, int64_t>> delete_bitmaps_vers { |
| {7, 9}, {8, 9}, {7, 9}, {7, 9}, {7, 9}, {7, 9}, {8, 9}, {9, 9}}; |
| std::vector<bool> segments_overlap {true, true, false, true, false, true, true, true}; |
| for (size_t i {0}; i < 8; i++) { |
| std::string rowset_id = std::to_string(rowset_start_id++); |
| create_committed_rowset_with_rowset_id(txn_kv.get(), accessor.get(), "1", tablet_id, |
| rowset_vers[i].first, rowset_vers[i].second, |
| rowset_id, segments_overlap[i], 1); |
| create_delete_bitmaps(txn.get(), tablet_id, rowset_id, delete_bitmaps_vers[i].first, |
| delete_bitmaps_vers[i].second); |
| } |
| } |
| |
| for (int tablet_id = 800011; tablet_id <= 800015; ++tablet_id) { |
| // no rowsets are compacted |
| ASSERT_EQ(0, |
| create_tablet(txn_kv.get(), table_id, index_id, partition_id, tablet_id, true)); |
| std::vector<std::pair<int64_t, int64_t>> rowset_vers {{2, 2}, {3, 3}, {4, 4}, {5, 5}, |
| {6, 6}, {7, 7}, {8, 8}, {9, 9}}; |
| std::vector<std::pair<int64_t, int64_t>> delete_bitmaps_vers { |
| {2, 9}, {3, 9}, {4, 9}, {5, 9}, {6, 9}, {7, 9}, {8, 9}, {9, 9}}; |
| std::vector<bool> segments_overlap {true, true, true, true, true, true, true, true}; |
| for (size_t i {0}; i < 8; i++) { |
| std::string rowset_id = std::to_string(rowset_start_id++); |
| create_committed_rowset_with_rowset_id(txn_kv.get(), accessor.get(), "1", tablet_id, |
| rowset_vers[i].first, rowset_vers[i].second, |
| rowset_id, segments_overlap[i], 1); |
| create_delete_bitmaps(txn.get(), tablet_id, rowset_id, delete_bitmaps_vers[i].first, |
| delete_bitmaps_vers[i].second); |
| } |
| } |
| |
| for (int tablet_id = 800016; tablet_id <= 800020; ++tablet_id) { |
| ASSERT_EQ(0, |
| create_tablet(txn_kv.get(), table_id, index_id, partition_id, tablet_id, true)); |
| std::vector<std::pair<int64_t, int64_t>> rowset_vers { |
| {2, 5}, {6, 6}, {7, 7}, {8, 8}, {9, 9}}; |
| std::vector<std::pair<int64_t, int64_t>> delete_bitmaps_vers { |
| {5, 9}, {6, 9}, {7, 9}, {8, 9}, {9, 9}}; |
| std::vector<bool> segments_overlap {false, true, true, true, true}; |
| for (size_t i {0}; i < 5; i++) { |
| std::string rowset_id = std::to_string(rowset_start_id++); |
| create_committed_rowset_with_rowset_id(txn_kv.get(), accessor.get(), "1", tablet_id, |
| rowset_vers[i].first, rowset_vers[i].second, |
| rowset_id, segments_overlap[i], 1); |
| create_delete_bitmaps(txn.get(), tablet_id, rowset_id, delete_bitmaps_vers[i].first, |
| delete_bitmaps_vers[i].second); |
| } |
| } |
| |
| // also create some rowsets without delete bitmaps in non merge-on-write tablet |
| for (int tablet_id = 700001; tablet_id <= 700010; ++tablet_id) { |
| ASSERT_EQ(0, |
| create_tablet(txn_kv.get(), table_id, index_id, partition_id, tablet_id, false)); |
| int64_t rowset_start_id = 500; |
| for (int ver = 2; ver < 10; ++ver) { |
| std::string rowset_id = std::to_string(rowset_start_id++); |
| create_committed_rowset_with_rowset_id(txn_kv.get(), accessor.get(), "1", tablet_id, |
| ver, ver, rowset_id, false, 1); |
| } |
| } |
| |
| ASSERT_EQ(TxnErrorCode::TXN_OK, txn->commit()); |
| ASSERT_EQ(checker.do_delete_bitmap_storage_optimize_check(), 0); |
| } |
| |
| TEST(CheckerTest, delete_bitmap_storage_optimize_check_abnormal) { |
| config::delete_bitmap_storage_optimize_check_version_gap = 0; |
| // abnormal case, some rowsets' delete bitmaps are not deleted as expected |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("1"); |
| |
| InstanceChecker checker(txn_kv, instance_id); |
| ASSERT_EQ(checker.init(instance), 0); |
| auto accessor = checker.accessor_map_.begin()->second; |
| |
| // tablet_id -> [rowset_id] |
| std::map<std::int64_t, std::set<std::string>> expected_abnormal_rowsets {}; |
| std::map<std::int64_t, std::set<std::string>> real_abnormal_rowsets {}; |
| auto sp = SyncPoint::get_instance(); |
| std::unique_ptr<int, std::function<void(int*)>> defer( |
| (int*)0x01, [](int*) { SyncPoint::get_instance()->clear_all_call_backs(); }); |
| sp->set_call_back("InstanceChecker::check_delete_bitmap_storage_optimize.get_abnormal_rowset", |
| [&real_abnormal_rowsets](auto&& args) { |
| int64_t tablet_id = *try_any_cast<int64_t*>(args[0]); |
| std::string rowset_id = *try_any_cast<std::string*>(args[1]); |
| real_abnormal_rowsets[tablet_id].insert(rowset_id); |
| }); |
| sp->enable_processing(); |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn)); |
| |
| constexpr int table_id = 10000, index_id = 10001, partition_id = 10002; |
| |
| int64_t rowset_start_id = 700; |
| for (int tablet_id = 900001; tablet_id <= 900005; ++tablet_id) { |
| ASSERT_EQ(0, |
| create_tablet(txn_kv.get(), table_id, index_id, partition_id, tablet_id, true)); |
| std::vector<std::pair<int64_t, int64_t>> rowset_vers {{2, 2}, {3, 3}, {4, 4}, {5, 5}, |
| {6, 7}, {8, 8}, {9, 9}}; |
| std::vector<std::pair<int64_t, int64_t>> delete_bitmaps_vers { |
| {2, 9}, {7, 9}, {4, 9}, {7, 9}, {7, 9}, {8, 9}, {9, 9}}; |
| std::vector<bool> segments_overlap {true, true, true, true, false, true, true}; |
| for (size_t i {0}; i < 7; i++) { |
| std::string rowset_id = std::to_string(rowset_start_id++); |
| create_committed_rowset_with_rowset_id(txn_kv.get(), accessor.get(), "1", tablet_id, |
| rowset_vers[i].first, rowset_vers[i].second, |
| rowset_id, segments_overlap[i], 1); |
| create_delete_bitmaps(txn.get(), tablet_id, rowset_id, delete_bitmaps_vers[i].first, |
| delete_bitmaps_vers[i].second); |
| if (delete_bitmaps_vers[i].first < 7) { |
| expected_abnormal_rowsets[tablet_id].insert(rowset_id); |
| } |
| } |
| } |
| |
| ASSERT_EQ(TxnErrorCode::TXN_OK, txn->commit()); |
| |
| ASSERT_EQ(checker.do_delete_bitmap_storage_optimize_check(), 1); |
| ASSERT_EQ(expected_abnormal_rowsets, real_abnormal_rowsets); |
| } |
| |
| TEST(RecyclerTest, delete_rowset_data) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("recycle_tmp_rowsets"); |
| obj_info->set_ak(config::test_s3_ak); |
| obj_info->set_sk(config::test_s3_sk); |
| obj_info->set_endpoint(config::test_s3_endpoint); |
| obj_info->set_region(config::test_s3_region); |
| obj_info->set_bucket(config::test_s3_bucket); |
| obj_info->set_prefix("recycle_tmp_rowsets"); |
| |
| std::vector<doris::TabletSchemaCloudPB> schemas; |
| for (int i = 0; i < 5; ++i) { |
| auto& schema = schemas.emplace_back(); |
| schema.set_schema_version(i); |
| schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1); |
| for (int j = 0; j < i; ++j) { |
| auto index = schema.add_index(); |
| index->set_index_id(j); |
| index->set_index_type(IndexType::INVERTED); |
| } |
| } |
| |
| { |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| auto accessor = recycler.accessor_map_.begin()->second; |
| int64_t txn_id_base = 114115; |
| int64_t tablet_id_base = 10015; |
| int64_t index_id_base = 1000; |
| // Delete each rowset directly using one RowsetPB |
| for (int i = 0; i < 100; ++i) { |
| int64_t txn_id = txn_id_base + i; |
| for (int j = 0; j < 20; ++j) { |
| auto rowset = create_rowset("recycle_tmp_rowsets", tablet_id_base + j, |
| index_id_base + j % 4, 5, schemas[i % 5], txn_id); |
| create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, i & 1); |
| ASSERT_EQ(0, recycler.delete_rowset_data(rowset)); |
| } |
| } |
| |
| std::unique_ptr<ListIterator> list_iter; |
| ASSERT_EQ(0, accessor->list_all(&list_iter)); |
| ASSERT_FALSE(list_iter->has_next()); |
| } |
| { |
| InstanceInfoPB tmp_instance; |
| std::string resource_id = "recycle_tmp_rowsets"; |
| tmp_instance.set_instance_id(instance_id); |
| auto tmp_obj_info = tmp_instance.add_obj_info(); |
| tmp_obj_info->set_id(resource_id); |
| tmp_obj_info->set_ak(config::test_s3_ak); |
| tmp_obj_info->set_sk(config::test_s3_sk); |
| tmp_obj_info->set_endpoint(config::test_s3_endpoint); |
| tmp_obj_info->set_region(config::test_s3_region); |
| tmp_obj_info->set_bucket(config::test_s3_bucket); |
| tmp_obj_info->set_prefix(resource_id); |
| |
| InstanceRecycler recycler(txn_kv, tmp_instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| auto accessor = recycler.accessor_map_.begin()->second; |
| // Delete multiple rowset files using one series of RowsetPB |
| constexpr int index_id = 10001, tablet_id = 10002; |
| std::vector<doris::RowsetMetaCloudPB> rowset_pbs; |
| for (int i = 0; i < 10; ++i) { |
| auto rowset = create_rowset(resource_id, tablet_id, index_id, 5, schemas[i % 5]); |
| create_recycle_rowset( |
| txn_kv.get(), accessor.get(), rowset, |
| static_cast<RecycleRowsetPB::Type>(i % (RecycleRowsetPB::Type_MAX + 1)), true); |
| |
| rowset_pbs.emplace_back(std::move(rowset)); |
| } |
| ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs, RowsetRecyclingState::FORMAL_ROWSET)); |
| std::unique_ptr<ListIterator> list_iter; |
| ASSERT_EQ(0, accessor->list_all(&list_iter)); |
| ASSERT_FALSE(list_iter->has_next()); |
| } |
| { |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| auto accessor = recycler.accessor_map_.begin()->second; |
| // Delete multiple rowset files using one series of RowsetPB |
| constexpr int index_id = 20001, tablet_id = 20002; |
| // Delete each rowset file directly using it's id to construct one path |
| for (int i = 0; i < 1000; ++i) { |
| auto rowset = |
| create_rowset("recycle_tmp_rowsets", tablet_id, index_id, 5, schemas[i % 5]); |
| create_recycle_rowset(txn_kv.get(), accessor.get(), rowset, RecycleRowsetPB::COMPACT, |
| true); |
| ASSERT_EQ(0, recycler.delete_rowset_data(rowset.resource_id(), rowset.tablet_id(), |
| rowset.rowset_id_v2())); |
| } |
| std::unique_ptr<ListIterator> list_iter; |
| ASSERT_EQ(0, accessor->list_all(&list_iter)); |
| ASSERT_FALSE(list_iter->has_next()); |
| } |
| } |
| |
| TEST(RecyclerTest, delete_rowset_data_without_inverted_index_storage_format) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("recycle_tmp_rowsets"); |
| obj_info->set_ak(config::test_s3_ak); |
| obj_info->set_sk(config::test_s3_sk); |
| obj_info->set_endpoint(config::test_s3_endpoint); |
| obj_info->set_region(config::test_s3_region); |
| obj_info->set_bucket(config::test_s3_bucket); |
| obj_info->set_prefix("recycle_tmp_rowsets"); |
| |
| std::vector<doris::TabletSchemaCloudPB> schemas; |
| for (int i = 0; i < 5; ++i) { |
| auto& schema = schemas.emplace_back(); |
| schema.set_schema_version(i); |
| //schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1); |
| for (int j = 0; j < i; ++j) { |
| auto index = schema.add_index(); |
| index->set_index_id(j); |
| index->set_index_type(IndexType::INVERTED); |
| } |
| } |
| |
| { |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| auto accessor = recycler.accessor_map_.begin()->second; |
| int64_t txn_id_base = 114115; |
| int64_t tablet_id_base = 10015; |
| int64_t index_id_base = 1000; |
| // Delete each rowset directly using one RowsetPB |
| for (int i = 0; i < 100; ++i) { |
| int64_t txn_id = txn_id_base + i; |
| for (int j = 0; j < 20; ++j) { |
| auto rowset = create_rowset("recycle_tmp_rowsets", tablet_id_base + j, |
| index_id_base + j % 4, 5, schemas[i % 5], txn_id); |
| create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, i & 1); |
| ASSERT_EQ(0, recycler.delete_rowset_data(rowset)); |
| } |
| } |
| |
| std::unique_ptr<ListIterator> list_iter; |
| ASSERT_EQ(0, accessor->list_all(&list_iter)); |
| ASSERT_FALSE(list_iter->has_next()); |
| } |
| { |
| InstanceInfoPB tmp_instance; |
| std::string resource_id = "recycle_tmp_rowsets"; |
| tmp_instance.set_instance_id(instance_id); |
| auto tmp_obj_info = tmp_instance.add_obj_info(); |
| tmp_obj_info->set_id(resource_id); |
| tmp_obj_info->set_ak(config::test_s3_ak); |
| tmp_obj_info->set_sk(config::test_s3_sk); |
| tmp_obj_info->set_endpoint(config::test_s3_endpoint); |
| tmp_obj_info->set_region(config::test_s3_region); |
| tmp_obj_info->set_bucket(config::test_s3_bucket); |
| tmp_obj_info->set_prefix(resource_id); |
| |
| InstanceRecycler recycler(txn_kv, tmp_instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| auto accessor = recycler.accessor_map_.begin()->second; |
| // Delete multiple rowset files using one series of RowsetPB |
| constexpr int index_id = 10001, tablet_id = 10002; |
| std::vector<doris::RowsetMetaCloudPB> rowset_pbs; |
| for (int i = 0; i < 10; ++i) { |
| auto rowset = create_rowset(resource_id, tablet_id, index_id, 5, schemas[i % 5]); |
| create_recycle_rowset( |
| txn_kv.get(), accessor.get(), rowset, |
| static_cast<RecycleRowsetPB::Type>(i % (RecycleRowsetPB::Type_MAX + 1)), true); |
| |
| rowset_pbs.emplace_back(std::move(rowset)); |
| } |
| ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs, RowsetRecyclingState::FORMAL_ROWSET)); |
| std::unique_ptr<ListIterator> list_iter; |
| ASSERT_EQ(0, accessor->list_all(&list_iter)); |
| ASSERT_FALSE(list_iter->has_next()); |
| } |
| { |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| auto accessor = recycler.accessor_map_.begin()->second; |
| // Delete multiple rowset files using one series of RowsetPB |
| constexpr int index_id = 20001, tablet_id = 20002; |
| // Delete each rowset file directly using it's id to construct one path |
| for (int i = 0; i < 1000; ++i) { |
| auto rowset = |
| create_rowset("recycle_tmp_rowsets", tablet_id, index_id, 5, schemas[i % 5]); |
| create_recycle_rowset(txn_kv.get(), accessor.get(), rowset, RecycleRowsetPB::COMPACT, |
| true); |
| ASSERT_EQ(0, recycler.delete_rowset_data(rowset.resource_id(), rowset.tablet_id(), |
| rowset.rowset_id_v2())); |
| } |
| std::unique_ptr<ListIterator> list_iter; |
| ASSERT_EQ(0, accessor->list_all(&list_iter)); |
| ASSERT_FALSE(list_iter->has_next()); |
| } |
| } |
| |
| TEST(RecyclerTest, init_vault_accessor_failed_test) { |
| auto* sp = SyncPoint::get_instance(); |
| std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&sp](int*) { |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| }); |
| |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| EXPECT_EQ(txn_kv->init(), 0); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string key; |
| std::string val; |
| |
| InstanceKeyInfo key_info {"test_instance"}; |
| instance_key(key_info, &key); |
| InstanceInfoPB instance; |
| instance.set_instance_id("GetObjStoreInfoTestInstance"); |
| // failed to init because S3Conf::from_obj_store_info() fails |
| { |
| ObjectStoreInfoPB obj_info; |
| StorageVaultPB vault; |
| obj_info.set_id("id"); |
| obj_info.set_ak("ak"); |
| obj_info.set_sk("sk"); |
| vault.mutable_obj_info()->MergeFrom(obj_info); |
| vault.set_name("test_failed_s3_vault_1"); |
| vault.set_id("failed_s3_1"); |
| instance.add_storage_vault_names(vault.name()); |
| instance.add_resource_ids(vault.id()); |
| txn->put(storage_vault_key({instance.instance_id(), "1"}), vault.SerializeAsString()); |
| } |
| |
| // succeed to init but unuseful |
| { |
| ObjectStoreInfoPB obj_info; |
| StorageVaultPB vault; |
| obj_info.set_id("id"); |
| obj_info.set_ak("ak"); |
| obj_info.set_sk("sk"); |
| obj_info.set_provider(ObjectStoreInfoPB_Provider_COS); |
| vault.mutable_obj_info()->MergeFrom(obj_info); |
| vault.set_name("test_failed_s3_vault_2"); |
| vault.set_id("failed_s3_2"); |
| instance.add_storage_vault_names(vault.name()); |
| instance.add_resource_ids(vault.id()); |
| instance.set_instance_id("GetObjStoreInfoTestInstance"); |
| txn->put(storage_vault_key({instance.instance_id(), "2"}), vault.SerializeAsString()); |
| } |
| |
| // failed to init because accessor->init() fails |
| { |
| HdfsBuildConf hdfs_build_conf; |
| StorageVaultPB vault; |
| hdfs_build_conf.set_fs_name("fs_name"); |
| hdfs_build_conf.set_user("root"); |
| HdfsVaultInfo hdfs_info; |
| hdfs_info.set_prefix("root_path"); |
| hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf); |
| vault.mutable_hdfs_info()->MergeFrom(hdfs_info); |
| vault.set_name("test_failed_hdfs_vault_1"); |
| vault.set_id("failed_hdfs_1"); |
| instance.add_storage_vault_names(vault.name()); |
| instance.add_resource_ids(vault.id()); |
| instance.set_instance_id("GetObjStoreInfoTestInstance"); |
| txn->put(storage_vault_key({instance.instance_id(), "3"}), vault.SerializeAsString()); |
| } |
| |
| auto accessor = std::make_shared<MockAccessor>(); |
| EXPECT_EQ(accessor->put_file("data/0/test.csv", ""), 0); |
| sp->set_call_back( |
| "InstanceRecycler::init_storage_vault_accessors.mock_vault", [&accessor](auto&& args) { |
| auto* map = try_any_cast< |
| std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>>*>( |
| args[0]); |
| auto* vault = try_any_cast<StorageVaultPB*>(args[1]); |
| if (vault->name() == "test_success_hdfs_vault") { |
| map->emplace(vault->id(), accessor); |
| } |
| }); |
| sp->set_call_back("InstanceRecycler::recycle_tablet.create_rowset_meta", [](auto&& args) { |
| auto* resp = try_any_cast<GetRowsetResponse*>(args[0]); |
| auto* rs = resp->add_rowset_meta(); |
| rs->set_resource_id("failed_s3_2"); |
| rs = resp->add_rowset_meta(); |
| rs->set_resource_id("success_vault"); |
| }); |
| sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) { |
| auto* ret = try_any_cast_ret<int>(args); |
| ret->first = -1; |
| ret->second = true; |
| }); |
| sp->enable_processing(); |
| |
| // succeed to init MockAccessor |
| { |
| HdfsBuildConf hdfs_build_conf; |
| StorageVaultPB vault; |
| hdfs_build_conf.set_fs_name("fs_name"); |
| hdfs_build_conf.set_user("root"); |
| HdfsVaultInfo hdfs_info; |
| hdfs_info.set_prefix("root_path"); |
| hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf); |
| vault.mutable_hdfs_info()->MergeFrom(hdfs_info); |
| vault.set_name("test_success_hdfs_vault"); |
| vault.set_id("success_vault"); |
| instance.add_storage_vault_names(vault.name()); |
| instance.add_resource_ids(vault.id()); |
| instance.set_instance_id("GetObjStoreInfoTestInstance"); |
| txn->put(storage_vault_key({instance.instance_id(), "4"}), vault.SerializeAsString()); |
| } |
| |
| val = instance.SerializeAsString(); |
| txn->put(key, val); |
| EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| EXPECT_EQ(accessor->exists("data/0/test.csv"), 0); |
| |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| EXPECT_EQ(recycler.init(), 0); |
| EXPECT_EQ(recycler.accessor_map_.size(), 2); |
| |
| // unuseful obj accessor |
| EXPECT_EQ(recycler.accessor_map_.at("failed_s3_2")->exists("data/0/test.csv"), -1); |
| // useful mock accessor |
| EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"), 0); |
| |
| // recycle tablet will fail because unuseful obj accessor can not connectted |
| EXPECT_EQ(recycler.recycle_tablet(0), -1); |
| // however, useful mock accessor can recycle tablet |
| EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"), 1); |
| } |
| |
| TEST(RecyclerTest, recycle_tablet_without_resource_id) { |
| auto* sp = SyncPoint::get_instance(); |
| std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&sp](int*) { |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| }); |
| |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| EXPECT_EQ(txn_kv->init(), 0); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string key; |
| std::string val; |
| |
| InstanceKeyInfo key_info {"test_instance"}; |
| instance_key(key_info, &key); |
| InstanceInfoPB instance; |
| instance.set_instance_id("GetObjStoreInfoTestInstance"); |
| |
| auto accessor = std::make_shared<MockAccessor>(); |
| EXPECT_EQ(accessor->put_file("data/0/test.csv", ""), 0); |
| sp->set_call_back( |
| "InstanceRecycler::init_storage_vault_accessors.mock_vault", [&accessor](auto&& args) { |
| auto* map = try_any_cast< |
| std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>>*>( |
| args[0]); |
| auto* vault = try_any_cast<StorageVaultPB*>(args[1]); |
| if (vault->name() == "test_success_hdfs_vault") { |
| map->emplace(vault->id(), accessor); |
| } |
| }); |
| sp->set_call_back("InstanceRecycler::recycle_tablet.create_rowset_meta", [](auto&& args) { |
| auto* resp = try_any_cast<GetRowsetResponse*>(args[0]); |
| auto* rs = resp->add_rowset_meta(); |
| EXPECT_EQ(rs->has_resource_id(), false); |
| }); |
| sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) { |
| auto* ret = try_any_cast_ret<int>(args); |
| ret->first = -1; |
| ret->second = true; |
| }); |
| sp->enable_processing(); |
| |
| // succeed to init MockAccessor |
| { |
| HdfsBuildConf hdfs_build_conf; |
| StorageVaultPB vault; |
| hdfs_build_conf.set_fs_name("fs_name"); |
| hdfs_build_conf.set_user("root"); |
| HdfsVaultInfo hdfs_info; |
| hdfs_info.set_prefix("root_path"); |
| hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf); |
| vault.mutable_hdfs_info()->MergeFrom(hdfs_info); |
| vault.set_name("test_success_hdfs_vault"); |
| vault.set_id("success_vault"); |
| instance.add_storage_vault_names(vault.name()); |
| instance.add_resource_ids(vault.id()); |
| instance.set_instance_id("GetObjStoreInfoTestInstance"); |
| txn->put(storage_vault_key({instance.instance_id(), "4"}), vault.SerializeAsString()); |
| } |
| |
| val = instance.SerializeAsString(); |
| txn->put(key, val); |
| EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| EXPECT_EQ(accessor->exists("data/0/test.csv"), 0); |
| |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| EXPECT_EQ(recycler.init(), 0); |
| EXPECT_EQ(recycler.accessor_map_.size(), 1); |
| |
| // useful mock accessor |
| EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"), 0); |
| |
| // recycle tablet will fail because unuseful obj accessor can not connectted |
| EXPECT_EQ(recycler.recycle_tablet(0), -1); |
| // no resource id, cannot recycle |
| EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"), 0); |
| } |
| |
| TEST(RecyclerTest, recycle_tablet_with_wrong_resource_id) { |
| auto* sp = SyncPoint::get_instance(); |
| std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&sp](int*) { |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| }); |
| |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| EXPECT_EQ(txn_kv->init(), 0); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string key; |
| std::string val; |
| |
| InstanceKeyInfo key_info {"test_instance"}; |
| instance_key(key_info, &key); |
| InstanceInfoPB instance; |
| instance.set_instance_id("GetObjStoreInfoTestInstance"); |
| |
| auto accessor = std::make_shared<MockAccessor>(); |
| EXPECT_EQ(accessor->put_file("data/0/test.csv", ""), 0); |
| sp->set_call_back( |
| "InstanceRecycler::init_storage_vault_accessors.mock_vault", [&accessor](auto&& args) { |
| auto* map = try_any_cast< |
| std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>>*>( |
| args[0]); |
| auto* vault = try_any_cast<StorageVaultPB*>(args[1]); |
| if (vault->name() == "test_success_hdfs_vault") { |
| map->emplace(vault->id(), accessor); |
| } |
| }); |
| sp->set_call_back("InstanceRecycler::recycle_tablet.create_rowset_meta", [](auto&& args) { |
| auto* resp = try_any_cast<GetRowsetResponse*>(args[0]); |
| auto* rs = resp->add_rowset_meta(); |
| rs->set_resource_id("no_id"); |
| }); |
| sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) { |
| auto* ret = try_any_cast_ret<int>(args); |
| ret->first = -1; |
| ret->second = true; |
| }); |
| sp->enable_processing(); |
| |
| // succeed to init MockAccessor |
| { |
| HdfsBuildConf hdfs_build_conf; |
| StorageVaultPB vault; |
| hdfs_build_conf.set_fs_name("fs_name"); |
| hdfs_build_conf.set_user("root"); |
| HdfsVaultInfo hdfs_info; |
| hdfs_info.set_prefix("root_path"); |
| hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf); |
| vault.mutable_hdfs_info()->MergeFrom(hdfs_info); |
| vault.set_name("test_success_hdfs_vault"); |
| vault.set_id("success_vault"); |
| instance.add_storage_vault_names(vault.name()); |
| instance.add_resource_ids(vault.id()); |
| instance.set_instance_id("GetObjStoreInfoTestInstance"); |
| txn->put(storage_vault_key({instance.instance_id(), "4"}), vault.SerializeAsString()); |
| } |
| |
| val = instance.SerializeAsString(); |
| txn->put(key, val); |
| EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| EXPECT_EQ(accessor->exists("data/0/test.csv"), 0); |
| |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| EXPECT_EQ(recycler.init(), 0); |
| EXPECT_EQ(recycler.accessor_map_.size(), 1); |
| |
| // useful mock accessor |
| EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"), 0); |
| |
| // recycle tablet will fail because unuseful obj accessor can not connectted |
| EXPECT_EQ(recycler.recycle_tablet(0), -1); |
| // no resource id, cannot recycle |
| EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"), 0); |
| } |
| |
| TEST(RecyclerTest, init_all_vault_accessors_failed_test) { |
| auto* sp = SyncPoint::get_instance(); |
| std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&sp](int*) { |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| }); |
| |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| EXPECT_EQ(txn_kv->init(), 0); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string key; |
| std::string val; |
| |
| InstanceKeyInfo key_info {"test_instance"}; |
| instance_key(key_info, &key); |
| InstanceInfoPB instance; |
| instance.set_instance_id("GetObjStoreInfoTestInstance"); |
| // failed to init because S3Conf::from_obj_store_info() fails |
| { |
| ObjectStoreInfoPB obj_info; |
| StorageVaultPB vault; |
| obj_info.set_id("id"); |
| obj_info.set_ak("ak"); |
| obj_info.set_sk("sk"); |
| vault.mutable_obj_info()->MergeFrom(obj_info); |
| vault.set_name("test_failed_s3_vault"); |
| vault.set_id("failed_s3"); |
| instance.add_storage_vault_names(vault.name()); |
| instance.add_resource_ids(vault.id()); |
| txn->put(storage_vault_key({instance.instance_id(), "1"}), vault.SerializeAsString()); |
| } |
| |
| sp->set_call_back("S3Accessor::init.s3_init_failed", [](auto&& args) { |
| auto* ret = try_any_cast_ret<int>(args); |
| ret->first = -1; |
| ret->second = true; |
| }); |
| sp->enable_processing(); |
| |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| EXPECT_EQ(recycler.init(), -2); |
| } |
| |
| TEST(RecyclerTest, recycler_storage_vault_white_list_test) { |
| auto* sp = SyncPoint::get_instance(); |
| std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&sp](int*) { |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| }); |
| |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| EXPECT_EQ(txn_kv->init(), 0); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string key; |
| std::string val; |
| |
| InstanceKeyInfo key_info {"test_instance"}; |
| instance_key(key_info, &key); |
| InstanceInfoPB instance; |
| instance.set_instance_id("GetObjStoreInfoTestInstance"); |
| { |
| ObjectStoreInfoPB obj_info; |
| StorageVaultPB vault; |
| obj_info.set_id("id"); |
| obj_info.set_ak("ak"); |
| obj_info.set_sk("sk"); |
| obj_info.set_provider(ObjectStoreInfoPB_Provider_COS); |
| vault.mutable_obj_info()->MergeFrom(obj_info); |
| vault.set_name("s3_1"); |
| vault.set_id("s3_1"); |
| instance.add_storage_vault_names(vault.name()); |
| instance.add_resource_ids(vault.id()); |
| txn->put(storage_vault_key({instance.instance_id(), "1"}), vault.SerializeAsString()); |
| } |
| |
| { |
| ObjectStoreInfoPB obj_info; |
| StorageVaultPB vault; |
| obj_info.set_id("id"); |
| obj_info.set_ak("ak"); |
| obj_info.set_sk("sk"); |
| obj_info.set_provider(ObjectStoreInfoPB_Provider_COS); |
| vault.mutable_obj_info()->MergeFrom(obj_info); |
| vault.set_name("s3_2"); |
| vault.set_id("s3_2"); |
| instance.add_storage_vault_names(vault.name()); |
| instance.add_resource_ids(vault.id()); |
| instance.set_instance_id("GetObjStoreInfoTestInstance"); |
| txn->put(storage_vault_key({instance.instance_id(), "2"}), vault.SerializeAsString()); |
| } |
| |
| { |
| HdfsBuildConf hdfs_build_conf; |
| StorageVaultPB vault; |
| hdfs_build_conf.set_fs_name("fs_name"); |
| hdfs_build_conf.set_user("root"); |
| HdfsVaultInfo hdfs_info; |
| hdfs_info.set_prefix("root_path"); |
| hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf); |
| vault.mutable_hdfs_info()->MergeFrom(hdfs_info); |
| vault.set_name("hdfs_1"); |
| vault.set_id("hdfs_1"); |
| instance.add_storage_vault_names(vault.name()); |
| instance.add_resource_ids(vault.id()); |
| instance.set_instance_id("GetObjStoreInfoTestInstance"); |
| txn->put(storage_vault_key({instance.instance_id(), "3"}), vault.SerializeAsString()); |
| } |
| |
| { |
| HdfsBuildConf hdfs_build_conf; |
| StorageVaultPB vault; |
| hdfs_build_conf.set_fs_name("fs_name"); |
| hdfs_build_conf.set_user("root"); |
| HdfsVaultInfo hdfs_info; |
| hdfs_info.set_prefix("root_path"); |
| hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf); |
| vault.mutable_hdfs_info()->MergeFrom(hdfs_info); |
| vault.set_name("hdfs_2"); |
| vault.set_id("hdfs_2"); |
| instance.add_storage_vault_names(vault.name()); |
| instance.add_resource_ids(vault.id()); |
| instance.set_instance_id("GetObjStoreInfoTestInstance"); |
| txn->put(storage_vault_key({instance.instance_id(), "4"}), vault.SerializeAsString()); |
| } |
| |
| auto accessor = std::make_shared<MockAccessor>(); |
| EXPECT_EQ(accessor->put_file("data/0/test.csv", ""), 0); |
| sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) { |
| auto* ret = try_any_cast_ret<int>(args); |
| ret->first = 0; |
| ret->second = true; |
| }); |
| sp->set_call_back( |
| "InstanceRecycler::init_storage_vault_accessors.mock_vault", [&accessor](auto&& args) { |
| auto* map = try_any_cast< |
| std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>>*>( |
| args[0]); |
| auto* vault = try_any_cast<StorageVaultPB*>(args[1]); |
| map->emplace(vault->id(), accessor); |
| }); |
| sp->enable_processing(); |
| |
| val = instance.SerializeAsString(); |
| txn->put(key, val); |
| EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| EXPECT_EQ(accessor->exists("data/0/test.csv"), 0); |
| |
| { |
| config::recycler_storage_vault_white_list = {}; |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| EXPECT_EQ(recycler.init(), 0); |
| EXPECT_EQ(recycler.accessor_map_.size(), 4); |
| } |
| |
| { |
| config::recycler_storage_vault_white_list = {"s3_1", "s3_2", "hdfs_1", "hdfs_2"}; |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| EXPECT_EQ(recycler.init(), 0); |
| EXPECT_EQ(recycler.accessor_map_.size(), 4); |
| } |
| |
| { |
| config::recycler_storage_vault_white_list = {"s3_1", "hdfs_1"}; |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| EXPECT_EQ(recycler.init(), 0); |
| EXPECT_EQ(recycler.accessor_map_.size(), 2); |
| EXPECT_EQ(recycler.accessor_map_.at("s3_1")->exists("data/0/test.csv"), 0); |
| EXPECT_EQ(recycler.accessor_map_.at("hdfs_1")->exists("data/0/test.csv"), 0); |
| } |
| } |
| |
| TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v1) { |
| auto* sp = SyncPoint::get_instance(); |
| std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&sp](int*) { |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| }); |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("delete_tmp_rowset_data_with_idx_v1"); |
| obj_info->set_ak(config::test_s3_ak); |
| obj_info->set_sk(config::test_s3_sk); |
| obj_info->set_endpoint(config::test_s3_endpoint); |
| obj_info->set_region(config::test_s3_region); |
| obj_info->set_bucket(config::test_s3_bucket); |
| obj_info->set_prefix("delete_tmp_rowset_data_with_idx_v1"); |
| |
| doris::TabletSchemaCloudPB schema; |
| schema.set_schema_version(1); |
| auto index = schema.add_index(); |
| index->set_index_id(1); |
| index->set_index_type(IndexType::INVERTED); |
| |
| sp->set_call_back("InstanceRecycler::delete_rowset_data.tmp_rowset", [](auto&& args) { |
| auto* ret = try_any_cast<int*>(args[0]); |
| *ret = 1; |
| }); |
| sp->enable_processing(); |
| |
| { |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| auto accessor = recycler.accessor_map_.begin()->second; |
| std::vector<doris::RowsetMetaCloudPB> rowset_pbs; |
| doris::RowsetMetaCloudPB rowset; |
| rowset.set_rowset_id(0); // useless but required |
| rowset.set_rowset_id_v2("1"); |
| rowset.set_num_segments(1); |
| rowset.set_tablet_id(10000); |
| rowset.set_index_id(10001); |
| rowset.set_resource_id("delete_tmp_rowset_data_with_idx_v1"); |
| rowset.set_schema_version(schema.schema_version()); |
| rowset.mutable_tablet_schema()->CopyFrom(schema); |
| create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, 1); |
| rowset.clear_tablet_schema(); |
| rowset_pbs.emplace_back(rowset); |
| |
| std::unordered_set<std::string> list_files; |
| std::unique_ptr<ListIterator> iter; |
| EXPECT_EQ(accessor->list_all(&iter), 0); |
| EXPECT_TRUE(iter->has_next()); |
| list_files.clear(); |
| for (auto file = iter->next(); file.has_value(); file = iter->next()) { |
| list_files.insert(file->path); |
| } |
| EXPECT_EQ(list_files.size(), 2); |
| // before delete tmp rowset, segment file and idx v1 exist |
| EXPECT_TRUE(list_files.contains("data/10000/1_0.dat")); |
| EXPECT_TRUE(list_files.contains("data/10000/1_0_1.idx")); |
| |
| ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs, RowsetRecyclingState::TMP_ROWSET)); |
| list_files.clear(); |
| iter.reset(); |
| EXPECT_EQ(accessor->list_all(&iter), 0); |
| EXPECT_TRUE(iter->has_next()); |
| for (auto file = iter->next(); file.has_value(); file = iter->next()) { |
| list_files.insert(file->path); |
| } |
| EXPECT_EQ(list_files.size(), 1); |
| // after delete tmp rowset, idx v1 exists |
| EXPECT_TRUE(list_files.contains("data/10000/1_0_1.idx")); |
| } |
| } |
| |
| TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v2) { |
| auto* sp = SyncPoint::get_instance(); |
| std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&sp](int*) { |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| }); |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("delete_tmp_rowset_data_with_idx_v2"); |
| obj_info->set_ak(config::test_s3_ak); |
| obj_info->set_sk(config::test_s3_sk); |
| obj_info->set_endpoint(config::test_s3_endpoint); |
| obj_info->set_region(config::test_s3_region); |
| obj_info->set_bucket(config::test_s3_bucket); |
| obj_info->set_prefix("delete_tmp_rowset_data_with_idx_v2"); |
| |
| doris::TabletSchemaCloudPB schema; |
| schema.set_schema_version(1); |
| auto index = schema.add_index(); |
| index->set_index_id(1); |
| index->set_index_type(IndexType::INVERTED); |
| |
| sp->set_call_back("InstanceRecycler::delete_rowset_data.tmp_rowset", [](auto&& args) { |
| auto* ret = try_any_cast<int*>(args[0]); |
| *ret = 1; |
| }); |
| sp->enable_processing(); |
| |
| { |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| auto accessor = recycler.accessor_map_.begin()->second; |
| std::vector<doris::RowsetMetaCloudPB> rowset_pbs; |
| doris::RowsetMetaCloudPB rowset; |
| rowset.set_rowset_id(0); // useless but required |
| rowset.set_rowset_id_v2("1"); |
| rowset.set_num_segments(1); |
| rowset.set_tablet_id(10000); |
| rowset.set_index_id(10001); |
| rowset.set_resource_id("delete_tmp_rowset_data_with_idx_v2"); |
| rowset.set_schema_version(schema.schema_version()); |
| rowset.mutable_tablet_schema()->CopyFrom(schema); |
| create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, 1, true); |
| rowset.clear_tablet_schema(); |
| rowset_pbs.emplace_back(rowset); |
| |
| std::unordered_set<std::string> list_files; |
| std::unique_ptr<ListIterator> iter; |
| EXPECT_EQ(accessor->list_all(&iter), 0); |
| EXPECT_TRUE(iter->has_next()); |
| list_files.clear(); |
| for (auto file = iter->next(); file.has_value(); file = iter->next()) { |
| list_files.insert(file->path); |
| } |
| EXPECT_EQ(list_files.size(), 2); |
| // before delete tmp rowset, segment file and idx v2 exist |
| EXPECT_TRUE(list_files.contains("data/10000/1_0.dat")); |
| EXPECT_TRUE(list_files.contains("data/10000/1_0.idx")); |
| |
| ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs, RowsetRecyclingState::TMP_ROWSET)); |
| list_files.clear(); |
| iter.reset(); |
| EXPECT_EQ(accessor->list_all(&iter), 0); |
| EXPECT_FALSE(iter->has_next()); |
| for (auto file = iter->next(); file.has_value(); file = iter->next()) { |
| list_files.insert(file->path); |
| } |
| // after delete tmp rowset, both file and idx v2 are removed |
| EXPECT_EQ(list_files.size(), 0); |
| } |
| } |
| |
| TEST(RecyclerTest, delete_tmp_rowset_without_resource_id) { |
| auto* sp = SyncPoint::get_instance(); |
| std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&sp](int*) { |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| }); |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| ASSERT_EQ(txn_kv->init(), 0); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(instance_id); |
| auto obj_info = instance.add_obj_info(); |
| obj_info->set_id("delete_tmp_rowset_data_with_idx_v2"); |
| obj_info->set_ak(config::test_s3_ak); |
| obj_info->set_sk(config::test_s3_sk); |
| obj_info->set_endpoint(config::test_s3_endpoint); |
| obj_info->set_region(config::test_s3_region); |
| obj_info->set_bucket(config::test_s3_bucket); |
| obj_info->set_prefix("delete_tmp_rowset_data_with_idx_v2"); |
| |
| doris::TabletSchemaCloudPB schema; |
| schema.set_schema_version(1); |
| auto index = schema.add_index(); |
| index->set_index_id(1); |
| index->set_index_type(IndexType::INVERTED); |
| |
| sp->set_call_back("InstanceRecycler::delete_rowset_data.tmp_rowset", [](auto&& args) { |
| auto* ret = try_any_cast<int*>(args[0]); |
| *ret = 1; |
| }); |
| sp->set_call_back("InstanceRecycler::delete_rowset_data.no_resource_id", [](auto&& args) { |
| auto* map = try_any_cast< |
| std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>>*>(args[0]); |
| map->erase("no_resource_id"); |
| ; |
| }); |
| sp->enable_processing(); |
| |
| { |
| InstanceRecycler recycler(txn_kv, instance, thread_group, |
| std::make_shared<TxnLazyCommitter>(txn_kv)); |
| ASSERT_EQ(recycler.init(), 0); |
| auto accessor = recycler.accessor_map_.begin()->second; |
| std::vector<doris::RowsetMetaCloudPB> rowset_pbs; |
| doris::RowsetMetaCloudPB rowset; |
| rowset.set_rowset_id(0); // useless but required |
| rowset.set_rowset_id_v2("1"); |
| rowset.set_num_segments(1); |
| rowset.set_tablet_id(10000); |
| rowset.set_index_id(10001); |
| rowset.set_resource_id("delete_tmp_rowset_data_with_idx_v2"); |
| rowset.set_schema_version(schema.schema_version()); |
| rowset.mutable_tablet_schema()->CopyFrom(schema); |
| create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, 1, true); |
| rowset.clear_tablet_schema(); |
| rowset_pbs.emplace_back(rowset); |
| |
| rowset.set_rowset_id(0); // useless but required |
| rowset.set_rowset_id_v2("2"); |
| rowset.set_num_segments(1); |
| rowset.set_tablet_id(20000); |
| rowset.set_index_id(20001); |
| rowset.set_resource_id("no_resource_id"); |
| rowset.set_schema_version(schema.schema_version()); |
| rowset.mutable_tablet_schema()->CopyFrom(schema); |
| create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, 1, true); |
| rowset.clear_tablet_schema(); |
| rowset_pbs.emplace_back(rowset); |
| |
| std::unordered_set<std::string> list_files; |
| std::unique_ptr<ListIterator> iter; |
| EXPECT_EQ(accessor->list_all(&iter), 0); |
| EXPECT_TRUE(iter->has_next()); |
| list_files.clear(); |
| for (auto file = iter->next(); file.has_value(); file = iter->next()) { |
| list_files.insert(file->path); |
| } |
| EXPECT_EQ(list_files.size(), 4); |
| // before delete tmp rowset, segment file and idx v2 exist |
| EXPECT_TRUE(list_files.contains("data/10000/1_0.dat")); |
| EXPECT_TRUE(list_files.contains("data/10000/1_0.idx")); |
| EXPECT_TRUE(list_files.contains("data/20000/2_0.dat")); |
| EXPECT_TRUE(list_files.contains("data/20000/2_0.idx")); |
| |
| EXPECT_EQ(-1, recycler.delete_rowset_data(rowset_pbs, RowsetRecyclingState::TMP_ROWSET)); |
| list_files.clear(); |
| iter.reset(); |
| EXPECT_EQ(accessor->list_all(&iter), 0); |
| EXPECT_TRUE(iter->has_next()); |
| for (auto file = iter->next(); file.has_value(); file = iter->next()) { |
| list_files.insert(file->path); |
| } |
| EXPECT_TRUE(list_files.contains("data/20000/2_0.dat")); |
| EXPECT_TRUE(list_files.contains("data/20000/2_0.idx")); |
| // after delete tmp rowset, for valit resource id rowset, both file and idx v2 are removed |
| EXPECT_EQ(list_files.size(), 2); |
| } |
| } |
| |
| } // namespace doris::cloud |