| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <gen_cpp/AgentService_types.h> |
| #include <gen_cpp/olap_file.pb.h> |
| #include <gtest/gtest-message.h> |
| #include <gtest/gtest-test-part.h> |
| #include <gtest/gtest.h> |
| |
| #include <memory> |
| |
| #include "cloud/cloud_base_compaction.h" |
| #include "cloud/cloud_cluster_info.h" |
| #include "cloud/cloud_storage_engine.h" |
| #include "cloud/cloud_tablet.h" |
| #include "cloud/cloud_tablet_mgr.h" |
| #include "json2pb/json_to_pb.h" |
| #include "olap/olap_common.h" |
| #include "olap/rowset/rowset_factory.h" |
| #include "olap/rowset/rowset_meta.h" |
| #include "olap/storage_policy.h" |
| #include "olap/tablet_meta.h" |
| #include "util/uid_util.h" |
| |
| namespace doris { |
| class TabletMap; |
| |
| class CloudCompactionTest : public testing::Test { |
| CloudCompactionTest() : _engine(CloudStorageEngine(EngineOptions {})) {} |
| void SetUp() override { |
| config::compaction_promotion_size_mbytes = 1024; |
| config::compaction_promotion_ratio = 0.05; |
| config::compaction_promotion_min_size_mbytes = 64; |
| config::compaction_min_size_mbytes = 64; |
| |
| _tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, |
| UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, |
| TCompressionType::LZ4F)); |
| |
| _json_rowset_meta = R"({ |
| "rowset_id": 540081, |
| "tablet_id": 15673, |
| "txn_id": 4042, |
| "tablet_schema_hash": 567997577, |
| "rowset_type": "BETA_ROWSET", |
| "rowset_state": "VISIBLE", |
| "start_version": 2, |
| "end_version": 2, |
| "num_rows": 3929, |
| "total_disk_size": 41, |
| "data_disk_size": 41, |
| "index_disk_size": 235, |
| "empty": false, |
| "load_id": { |
| "hi": -5350970832824939812, |
| "lo": -6717994719194512122 |
| }, |
| "creation_time": 1553765670, |
| "num_segments": 3 |
| })"; |
| _cluster_info = std::make_shared<CloudClusterInfo>(); |
| _cluster_info->_is_in_standby = false; |
| ExecEnv::GetInstance()->_cluster_info = _cluster_info.get(); |
| } |
| void TearDown() override {} |
| |
| void init_rs_meta(RowsetMetaSharedPtr& pb1, int64_t start, int64_t end) { |
| RowsetMetaPB rowset_meta_pb; |
| json2pb::JsonToProtoMessage(_json_rowset_meta, &rowset_meta_pb); |
| rowset_meta_pb.set_start_version(start); |
| rowset_meta_pb.set_end_version(end); |
| rowset_meta_pb.set_creation_time(10000); |
| |
| pb1->init_from_pb(rowset_meta_pb); |
| pb1->set_total_disk_size(41); |
| pb1->set_tablet_schema(_tablet_meta->tablet_schema()); |
| } |
| |
| void init_rs_meta_small_base(std::vector<RowsetMetaSharedPtr>* rs_metas) { |
| RowsetMetaSharedPtr ptr1(new RowsetMeta()); |
| init_rs_meta(ptr1, 0, 0); |
| rs_metas->push_back(ptr1); |
| |
| RowsetMetaSharedPtr ptr2(new RowsetMeta()); |
| init_rs_meta(ptr2, 1, 1); |
| rs_metas->push_back(ptr2); |
| |
| RowsetMetaSharedPtr ptr3(new RowsetMeta()); |
| init_rs_meta(ptr3, 2, 2); |
| rs_metas->push_back(ptr3); |
| |
| RowsetMetaSharedPtr ptr4(new RowsetMeta()); |
| init_rs_meta(ptr4, 3, 3); |
| rs_metas->push_back(ptr4); |
| |
| RowsetMetaSharedPtr ptr5(new RowsetMeta()); |
| init_rs_meta(ptr5, 4, 4); |
| rs_metas->push_back(ptr5); |
| } |
| |
| protected: |
| std::string _json_rowset_meta; |
| TabletMetaSharedPtr _tablet_meta; |
| |
| public: |
| CloudStorageEngine _engine; |
| std::shared_ptr<CloudClusterInfo> _cluster_info; |
| }; |
| |
| TEST_F(CloudCompactionTest, failure_base_compaction_tablet_sleep_test) { |
| auto filter_out = [](CloudTablet* t) { return false; }; |
| CloudTabletMgr mgr(_engine); |
| |
| std::vector<RowsetMetaSharedPtr> rs_metas; |
| init_rs_meta_small_base(&rs_metas); |
| |
| CloudTabletSPtr tablet1 = std::make_shared<CloudTablet>(_engine, _tablet_meta); |
| for (auto& rs_meta : rs_metas) { |
| static_cast<void>(_tablet_meta->add_rs_meta(rs_meta)); |
| } |
| tablet1->tablet_meta()->_tablet_id = 10000; |
| tablet1->set_last_base_compaction_failure_time( |
| duration_cast<std::chrono::milliseconds>( |
| std::chrono::system_clock::now().time_since_epoch()) |
| .count() - |
| 100000); |
| tablet1->set_last_base_compaction_failure_time(0); |
| tablet1->tablet_meta()->tablet_schema()->set_disable_auto_compaction(false); |
| tablet1->_approximate_num_rowsets = 10; |
| mgr.put_tablet_for_UT(tablet1); |
| |
| int64_t max_score; |
| std::vector<std::shared_ptr<CloudTablet>> tablets {}; |
| Status st = mgr.get_topn_tablets_to_compact(1, CompactionType::BASE_COMPACTION, filter_out, |
| &tablets, &max_score); |
| ASSERT_EQ(st, Status::OK()); |
| ASSERT_EQ(tablets.size(), 1); |
| |
| tablet1->set_last_base_compaction_failure_time( |
| duration_cast<std::chrono::milliseconds>( |
| std::chrono::system_clock::now().time_since_epoch()) |
| .count()); |
| st = mgr.get_topn_tablets_to_compact(1, CompactionType::BASE_COMPACTION, filter_out, &tablets, |
| &max_score); |
| ASSERT_EQ(st, Status::OK()); |
| ASSERT_EQ(tablets.size(), 0); |
| } |
| |
| TEST_F(CloudCompactionTest, failure_cumu_compaction_tablet_sleep_test) { |
| auto filter_out = [](CloudTablet* t) { return false; }; |
| CloudTabletMgr mgr(_engine); |
| |
| std::vector<RowsetMetaSharedPtr> rs_metas; |
| init_rs_meta_small_base(&rs_metas); |
| |
| CloudTabletSPtr tablet1 = std::make_shared<CloudTablet>(_engine, _tablet_meta); |
| for (auto& rs_meta : rs_metas) { |
| static_cast<void>(_tablet_meta->add_rs_meta(rs_meta)); |
| } |
| tablet1->tablet_meta()->_tablet_id = 10000; |
| tablet1->set_last_cumu_compaction_failure_time( |
| duration_cast<std::chrono::milliseconds>( |
| std::chrono::system_clock::now().time_since_epoch()) |
| .count() - |
| 100000); |
| tablet1->set_last_cumu_compaction_failure_time(0); |
| tablet1->tablet_meta()->tablet_schema()->set_disable_auto_compaction(false); |
| tablet1->_approximate_cumu_num_deltas = 10; |
| mgr.put_tablet_for_UT(tablet1); |
| |
| int64_t max_score; |
| std::vector<std::shared_ptr<CloudTablet>> tablets {}; |
| Status st = mgr.get_topn_tablets_to_compact(1, CompactionType::CUMULATIVE_COMPACTION, |
| filter_out, &tablets, &max_score); |
| ASSERT_EQ(st, Status::OK()); |
| ASSERT_EQ(tablets.size(), 1); |
| |
| tablet1->set_last_cumu_compaction_failure_time( |
| duration_cast<std::chrono::milliseconds>( |
| std::chrono::system_clock::now().time_since_epoch()) |
| .count()); |
| st = mgr.get_topn_tablets_to_compact(1, CompactionType::BASE_COMPACTION, filter_out, &tablets, |
| &max_score); |
| ASSERT_EQ(st, Status::OK()); |
| ASSERT_EQ(tablets.size(), 0); |
| } |
| |
| static RowsetSharedPtr create_rowset(Version version, int num_segments, bool overlapping, |
| int data_size) { |
| auto rs_meta = std::make_shared<RowsetMeta>(); |
| rs_meta->set_rowset_type(BETA_ROWSET); // important |
| rs_meta->_rowset_meta_pb.set_start_version(version.first); |
| rs_meta->_rowset_meta_pb.set_end_version(version.second); |
| rs_meta->set_num_segments(num_segments); |
| rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING); |
| rs_meta->set_total_disk_size(data_size); |
| RowsetSharedPtr rowset; |
| Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta, &rowset); |
| if (!st.ok()) { |
| return nullptr; |
| } |
| return rowset; |
| } |
| |
| class TestableCloudCompaction : public CloudCompactionMixin { |
| public: |
| TestableCloudCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet) |
| : CloudCompactionMixin(engine, tablet, "test_compaction") {} |
| |
| // Set input rowsets for testing |
| void set_input_rowsets(const std::vector<RowsetSharedPtr>& rowsets) { |
| _input_rowsets = rowsets; |
| } |
| |
| // Get input rowsets for verification |
| const std::vector<RowsetSharedPtr>& get_input_rowsets() const { return _input_rowsets; } |
| |
| // Expose the protected method for testing |
| size_t test_apply_txn_size_truncation_and_log(const std::string& compaction_name) { |
| return apply_txn_size_truncation_and_log(compaction_name); |
| } |
| |
| Status prepare_compact() override { return Status::OK(); } |
| |
| ReaderType compaction_type() const override { return ReaderType::READER_CUMULATIVE_COMPACTION; } |
| |
| std::string_view compaction_name() const override { return "test_compaction"; } |
| }; |
| |
| TEST_F(CloudCompactionTest, test_set_storage_resource_from_input_rowsets) { |
| S3Conf s3_conf {.bucket = "bucket", |
| .prefix = "prefix", |
| .client_conf = { |
| .endpoint = "endpoint", |
| .region = "region", |
| .ak = "ak", |
| .sk = "sk", |
| .token = "", |
| .bucket = "", |
| .role_arn = "", |
| .external_id = "", |
| }}; |
| std::string resource_id = "10000"; |
| auto res = io::S3FileSystem::create(std::move(s3_conf), resource_id); |
| ASSERT_TRUE(res.has_value()) << res.error(); |
| auto fs = res.value(); |
| StorageResource storage_resource(fs); |
| |
| CloudTabletSPtr tablet = std::make_shared<CloudTablet>(_engine, _tablet_meta); |
| TestableCloudCompaction compaction(_engine, tablet); |
| |
| // Test case 1: All rowsets are empty (num_segments = 0) - should succeed |
| { |
| std::vector<RowsetSharedPtr> rowsets; |
| |
| RowsetSharedPtr rowset1 = create_rowset(Version(2, 2), 0, false, 41); |
| ASSERT_TRUE(rowset1 != nullptr); |
| rowset1->set_hole_rowset(true); // Mark as hole rowset since num_segments=0 |
| rowsets.push_back(rowset1); |
| |
| RowsetSharedPtr rowset2 = create_rowset(Version(3, 3), 0, false, 41); |
| ASSERT_TRUE(rowset2 != nullptr); |
| rowset2->set_hole_rowset(true); // Mark as hole rowset since num_segments=0 |
| rowsets.push_back(rowset2); |
| |
| compaction.set_input_rowsets(rowsets); |
| |
| RowsetWriterContext ctx; |
| Status st = compaction.set_storage_resource_from_input_rowsets(ctx); |
| ASSERT_TRUE(st.ok()) << st.to_string(); |
| // No storage resource should be set since no rowset has resource_id |
| ASSERT_FALSE(ctx.storage_resource.has_value()); |
| } |
| |
| // Test case 2: Backward iteration - last rowset has resource_id |
| { |
| std::vector<RowsetSharedPtr> rowsets; |
| |
| // First rowset: empty, no resource_id |
| RowsetSharedPtr rowset1 = create_rowset(Version(2, 2), 0, false, 41); |
| ASSERT_TRUE(rowset1 != nullptr); |
| rowset1->set_hole_rowset(true); |
| rowsets.push_back(rowset1); |
| |
| // Second rowset: empty, no resource_id |
| RowsetSharedPtr rowset2 = create_rowset(Version(3, 3), 0, false, 41); |
| ASSERT_TRUE(rowset2 != nullptr); |
| rowset2->set_hole_rowset(true); |
| rowsets.push_back(rowset2); |
| |
| // Third rowset: has resource_id (should be found during backward iteration) |
| RowsetSharedPtr rowset3 = create_rowset(Version(4, 4), 1, false, 41); |
| ASSERT_TRUE(rowset3 != nullptr); |
| rowset3->rowset_meta()->set_remote_storage_resource(storage_resource); |
| rowsets.push_back(rowset3); |
| |
| compaction.set_input_rowsets(rowsets); |
| |
| RowsetWriterContext ctx; |
| Status st = compaction.set_storage_resource_from_input_rowsets(ctx); |
| ASSERT_TRUE(st.ok()) << st.to_string(); |
| // Storage resource should be set from rowset3 |
| ASSERT_TRUE(ctx.storage_resource.has_value()); |
| } |
| |
| // Test case 3: Multiple rowsets with resource_id - should use the last one (backward iteration) |
| { |
| std::vector<RowsetSharedPtr> rowsets; |
| |
| // First rowset: has resource_id |
| RowsetSharedPtr rowset1 = create_rowset(Version(2, 2), 1, false, 41); |
| ASSERT_TRUE(rowset1 != nullptr); |
| StorageResource first_resource(fs); |
| rowset1->rowset_meta()->set_remote_storage_resource(first_resource); |
| rowsets.push_back(rowset1); |
| |
| // Second rowset: empty, no resource_id |
| RowsetSharedPtr rowset2 = create_rowset(Version(3, 3), 0, false, 41); |
| ASSERT_TRUE(rowset2 != nullptr); |
| rowset2->set_hole_rowset(true); |
| rowsets.push_back(rowset2); |
| |
| // Third rowset: has different resource_id (should be used due to backward iteration) |
| RowsetSharedPtr rowset3 = create_rowset(Version(4, 4), 1, false, 41); |
| ASSERT_TRUE(rowset3 != nullptr); |
| rowset3->rowset_meta()->set_remote_storage_resource(storage_resource); |
| rowsets.push_back(rowset3); |
| |
| compaction.set_input_rowsets(rowsets); |
| |
| RowsetWriterContext ctx; |
| Status st = compaction.set_storage_resource_from_input_rowsets(ctx); |
| ASSERT_TRUE(st.ok()) << st.to_string(); |
| // Storage resource should be set from rowset3 (last one with resource_id) |
| ASSERT_TRUE(ctx.storage_resource.has_value()); |
| } |
| |
| // Test case 4: Non-empty rowset in the middle without resource_id - should fail |
| { |
| std::vector<RowsetSharedPtr> rowsets; |
| |
| // First rowset: has resource_id |
| RowsetSharedPtr rowset1 = create_rowset(Version(2, 2), 1, false, 41); |
| ASSERT_TRUE(rowset1 != nullptr); |
| rowset1->rowset_meta()->set_remote_storage_resource(storage_resource); |
| rowsets.push_back(rowset1); |
| |
| // Second rowset: non-empty but no resource_id (invalid) |
| RowsetSharedPtr rowset2 = create_rowset(Version(3, 3), 2, false, 41); |
| ASSERT_TRUE(rowset2 != nullptr); |
| // Intentionally don't set resource_id |
| rowsets.push_back(rowset2); |
| |
| // Third rowset: empty, no resource_id |
| RowsetSharedPtr rowset3 = create_rowset(Version(4, 4), 0, false, 41); |
| ASSERT_TRUE(rowset3 != nullptr); |
| rowset3->set_hole_rowset(true); // Mark as hole rowset since num_segments=0 |
| rowsets.push_back(rowset3); |
| |
| compaction.set_input_rowsets(rowsets); |
| |
| RowsetWriterContext ctx; |
| Status st = compaction.set_storage_resource_from_input_rowsets(ctx); |
| ASSERT_TRUE(st.is<ErrorCode::INTERNAL_ERROR>()); |
| ASSERT_TRUE(st.to_string().find("Non-empty rowset must have valid resource_id") != |
| std::string::npos) |
| << st.to_string(); |
| } |
| |
| // Test case 5: Empty input rowsets - should succeed |
| { |
| std::vector<RowsetSharedPtr> rowsets; // Empty vector |
| |
| compaction.set_input_rowsets(rowsets); |
| |
| RowsetWriterContext ctx; |
| Status st = compaction.set_storage_resource_from_input_rowsets(ctx); |
| ASSERT_TRUE(st.ok()) << st.to_string(); |
| // No storage resource should be set |
| ASSERT_FALSE(ctx.storage_resource.has_value()); |
| } |
| } |
| TEST_F(CloudCompactionTest, should_cache_compaction_output) { |
| CloudTabletSPtr tablet = std::make_shared<CloudTablet>(_engine, std::make_shared<TabletMeta>()); |
| CloudBaseCompaction cloud_base_compaction(_engine, tablet); |
| cloud_base_compaction._input_rowsets_total_size = 0; |
| cloud_base_compaction._input_rowsets_cached_data_size = 0; |
| cloud_base_compaction._input_rowsets_cached_index_size = 0; |
| ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), false); |
| |
| cloud_base_compaction._input_rowsets_total_size = 100; |
| cloud_base_compaction._input_rowsets_cached_data_size = 0; |
| cloud_base_compaction._input_rowsets_cached_index_size = 0; |
| ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), false); |
| |
| cloud_base_compaction._input_rowsets_total_size = 100; |
| cloud_base_compaction._input_rowsets_cached_data_size = 70; |
| cloud_base_compaction._input_rowsets_cached_index_size = 0; |
| ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), false); |
| |
| cloud_base_compaction._input_rowsets_total_size = 100; |
| cloud_base_compaction._input_rowsets_cached_data_size = 0; |
| cloud_base_compaction._input_rowsets_cached_index_size = 70; |
| ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), false); |
| |
| cloud_base_compaction._input_rowsets_total_size = 100; |
| cloud_base_compaction._input_rowsets_cached_data_size = 0; |
| cloud_base_compaction._input_rowsets_cached_index_size = 70; |
| ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), false); |
| |
| cloud_base_compaction._input_rowsets_total_size = 100; |
| cloud_base_compaction._input_rowsets_cached_data_size = 80; |
| cloud_base_compaction._input_rowsets_cached_index_size = 0; |
| ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), true); |
| |
| cloud_base_compaction._input_rowsets_total_size = 100; |
| cloud_base_compaction._input_rowsets_cached_data_size = 0; |
| cloud_base_compaction._input_rowsets_cached_index_size = 80; |
| ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), true); |
| |
| cloud_base_compaction._input_rowsets_total_size = 100; |
| cloud_base_compaction._input_rowsets_cached_data_size = 50; |
| cloud_base_compaction._input_rowsets_cached_index_size = 50; |
| ASSERT_EQ(cloud_base_compaction.should_cache_compaction_output(), true); |
| LOG(INFO) << "should_cache_compaction_output done"; |
| } |
| |
| TEST_F(CloudCompactionTest, test_truncate_rowsets_by_txn_size_empty_input) { |
| std::vector<RowsetSharedPtr> rowsets; |
| int64_t kept_size = 100; |
| int64_t truncated_size = 50; |
| |
| size_t truncated = cloud::truncate_rowsets_by_txn_size(rowsets, kept_size, truncated_size); |
| |
| ASSERT_EQ(truncated, 0); |
| ASSERT_EQ(kept_size, 0); |
| ASSERT_EQ(truncated_size, 0); |
| ASSERT_EQ(rowsets.size(), 0); |
| } |
| |
| TEST_F(CloudCompactionTest, test_truncate_rowsets_by_txn_size_single_rowset_under_limit) { |
| // Create a single rowset |
| std::vector<RowsetSharedPtr> rowsets; |
| RowsetSharedPtr rowset1 = create_rowset(Version(2, 2), 1, false, 1024); |
| ASSERT_TRUE(rowset1 != nullptr); |
| rowsets.push_back(rowset1); |
| |
| // Set a large max size |
| config::compaction_txn_max_size_bytes = 1024 * 1024 * 1024; // 1GB |
| |
| int64_t kept_size = 0; |
| int64_t truncated_size = 0; |
| |
| size_t truncated = cloud::truncate_rowsets_by_txn_size(rowsets, kept_size, truncated_size); |
| |
| ASSERT_EQ(truncated, 0); |
| ASSERT_EQ(rowsets.size(), 1); |
| ASSERT_GT(kept_size, 0); |
| ASSERT_EQ(truncated_size, 0); |
| } |
| |
| TEST_F(CloudCompactionTest, test_truncate_rowsets_by_txn_size_multiple_rowsets_all_fit) { |
| std::vector<RowsetSharedPtr> rowsets; |
| for (int i = 0; i < 5; i++) { |
| RowsetSharedPtr rowset = create_rowset(Version(i, i), 1, false, 1024); |
| ASSERT_TRUE(rowset != nullptr); |
| rowsets.push_back(rowset); |
| } |
| |
| config::compaction_txn_max_size_bytes = 1024 * 1024 * 1024; // 1GB |
| |
| int64_t kept_size = 0; |
| int64_t truncated_size = 0; |
| |
| size_t truncated = cloud::truncate_rowsets_by_txn_size(rowsets, kept_size, truncated_size); |
| |
| ASSERT_EQ(truncated, 0); |
| ASSERT_EQ(rowsets.size(), 5); |
| ASSERT_GT(kept_size, 0); |
| ASSERT_EQ(truncated_size, 0); |
| } |
| |
| TEST_F(CloudCompactionTest, test_truncate_rowsets_by_txn_size_exceeds_limit) { |
| std::vector<RowsetSharedPtr> rowsets; |
| for (int i = 0; i < 10; i++) { |
| RowsetSharedPtr rowset = create_rowset(Version(i, i), 1, false, 1024); |
| ASSERT_TRUE(rowset != nullptr); |
| rowsets.push_back(rowset); |
| } |
| |
| // Set a very small max size to force truncation |
| config::compaction_txn_max_size_bytes = 50; // 50 bytes, should keep only a few rowsets |
| |
| int64_t kept_size = 0; |
| int64_t truncated_size = 0; |
| |
| size_t truncated = cloud::truncate_rowsets_by_txn_size(rowsets, kept_size, truncated_size); |
| |
| // Should truncate some rowsets |
| ASSERT_GT(truncated, 0); |
| ASSERT_LT(rowsets.size(), 10); |
| ASSERT_GT(rowsets.size(), 0); // At least 1 rowset kept |
| ASSERT_GT(truncated_size, 0); |
| } |
| |
| TEST_F(CloudCompactionTest, test_truncate_rowsets_by_txn_size_first_rowset_exceeds_limit) { |
| std::vector<RowsetSharedPtr> rowsets; |
| RowsetSharedPtr rowset1 = create_rowset(Version(0, 0), 1, false, 1024); |
| ASSERT_TRUE(rowset1 != nullptr); |
| rowsets.push_back(rowset1); |
| |
| // Set max size smaller than the first rowset's metadata size |
| config::compaction_txn_max_size_bytes = 1; // 1 byte |
| |
| int64_t kept_size = 0; |
| int64_t truncated_size = 0; |
| |
| size_t truncated = cloud::truncate_rowsets_by_txn_size(rowsets, kept_size, truncated_size); |
| |
| // Should keep at least 1 rowset even if it exceeds the limit |
| ASSERT_EQ(truncated, 0); |
| ASSERT_EQ(rowsets.size(), 1); |
| ASSERT_GT(kept_size, config::compaction_txn_max_size_bytes); |
| ASSERT_EQ(truncated_size, 0); |
| } |
| |
| TEST_F(CloudCompactionTest, test_truncate_rowsets_by_txn_size_exact_boundary) { |
| std::vector<RowsetSharedPtr> rowsets; |
| RowsetSharedPtr rowset1 = create_rowset(Version(0, 0), 1, false, 1024); |
| ASSERT_TRUE(rowset1 != nullptr); |
| rowsets.push_back(rowset1); |
| |
| // Get the actual size of the first rowset |
| int64_t first_kept_size = 0; |
| int64_t first_truncated_size = 0; |
| |
| std::vector<RowsetSharedPtr> temp_rowsets = {rowset1}; |
| cloud::truncate_rowsets_by_txn_size(temp_rowsets, first_kept_size, first_truncated_size); |
| |
| // Add more rowsets |
| for (int i = 1; i < 5; i++) { |
| RowsetSharedPtr rowset = create_rowset(Version(i, i), 1, false, 1024); |
| ASSERT_TRUE(rowset != nullptr); |
| rowsets.push_back(rowset); |
| } |
| |
| // Set max size to exactly the size of first rowset |
| config::compaction_txn_max_size_bytes = first_kept_size; |
| |
| int64_t kept_size = 0; |
| int64_t truncated_size = 0; |
| |
| size_t truncated = cloud::truncate_rowsets_by_txn_size(rowsets, kept_size, truncated_size); |
| |
| // Should keep only 1 rowset at the boundary |
| ASSERT_EQ(rowsets.size(), 1); |
| ASSERT_EQ(truncated, 4); |
| ASSERT_EQ(kept_size, first_kept_size); |
| ASSERT_GT(truncated_size, 0); |
| } |
| |
| TEST_F(CloudCompactionTest, test_truncate_rowsets_by_txn_size_output_parameters) { |
| std::vector<RowsetSharedPtr> rowsets; |
| for (int i = 0; i < 3; i++) { |
| RowsetSharedPtr rowset = create_rowset(Version(i, i), 1, false, 1024); |
| ASSERT_TRUE(rowset != nullptr); |
| rowsets.push_back(rowset); |
| } |
| |
| config::compaction_txn_max_size_bytes = 1024 * 1024; |
| |
| int64_t kept_size = 0; |
| int64_t truncated_size = 0; |
| |
| size_t truncated = cloud::truncate_rowsets_by_txn_size(rowsets, kept_size, truncated_size); |
| |
| // Verify output parameters are set correctly |
| ASSERT_EQ(truncated, 0); // All rowsets fit |
| ASSERT_EQ(rowsets.size(), 3); |
| ASSERT_GT(kept_size, 0); |
| ASSERT_EQ(truncated_size, 0); |
| } |
| |
| TEST_F(CloudCompactionTest, test_apply_txn_size_truncation_and_log_empty_input) { |
| CloudTabletSPtr tablet = std::make_shared<CloudTablet>(_engine, _tablet_meta); |
| TestableCloudCompaction compaction(_engine, tablet); |
| |
| // Test with empty input rowsets |
| std::vector<RowsetSharedPtr> empty_rowsets; |
| compaction.set_input_rowsets(empty_rowsets); |
| |
| size_t truncated = compaction.test_apply_txn_size_truncation_and_log("test_compaction"); |
| |
| ASSERT_EQ(truncated, 0); |
| ASSERT_EQ(compaction.get_input_rowsets().size(), 0); |
| } |
| |
| TEST_F(CloudCompactionTest, test_apply_txn_size_truncation_and_log_no_truncation) { |
| CloudTabletSPtr tablet = std::make_shared<CloudTablet>(_engine, _tablet_meta); |
| TestableCloudCompaction compaction(_engine, tablet); |
| |
| // Create rowsets that fit within the limit |
| std::vector<RowsetSharedPtr> rowsets; |
| for (int i = 0; i < 3; i++) { |
| RowsetSharedPtr rowset = create_rowset(Version(i, i), 1, false, 1024); |
| ASSERT_TRUE(rowset != nullptr); |
| rowsets.push_back(rowset); |
| } |
| |
| compaction.set_input_rowsets(rowsets); |
| |
| // Set a large max size |
| config::compaction_txn_max_size_bytes = 1024 * 1024 * 1024; // 1GB |
| |
| size_t truncated = compaction.test_apply_txn_size_truncation_and_log("test_compaction"); |
| |
| ASSERT_EQ(truncated, 0); |
| ASSERT_EQ(compaction.get_input_rowsets().size(), 3); |
| } |
| |
| TEST_F(CloudCompactionTest, test_apply_txn_size_truncation_and_log_with_truncation) { |
| CloudTabletSPtr tablet = std::make_shared<CloudTablet>(_engine, _tablet_meta); |
| TestableCloudCompaction compaction(_engine, tablet); |
| |
| // Create multiple rowsets |
| std::vector<RowsetSharedPtr> rowsets; |
| for (int i = 0; i < 10; i++) { |
| RowsetSharedPtr rowset = create_rowset(Version(i, i), 1, false, 1024); |
| ASSERT_TRUE(rowset != nullptr); |
| rowsets.push_back(rowset); |
| } |
| |
| compaction.set_input_rowsets(rowsets); |
| |
| // Set a small max size to force truncation |
| config::compaction_txn_max_size_bytes = 100; // Very small to force truncation |
| |
| size_t truncated = compaction.test_apply_txn_size_truncation_and_log("test_compaction"); |
| |
| // Should have truncated some rowsets |
| ASSERT_GT(truncated, 0); |
| ASSERT_LT(compaction.get_input_rowsets().size(), 10); |
| ASSERT_GT(compaction.get_input_rowsets().size(), 0); |
| } |
| |
| TEST_F(CloudCompactionTest, test_apply_txn_size_truncation_and_log_version_range) { |
| CloudTabletSPtr tablet = std::make_shared<CloudTablet>(_engine, _tablet_meta); |
| TestableCloudCompaction compaction(_engine, tablet); |
| |
| // Create rowsets with consecutive versions |
| std::vector<RowsetSharedPtr> rowsets; |
| for (int i = 10; i < 20; i++) { |
| RowsetSharedPtr rowset = create_rowset(Version(i, i), 1, false, 1024); |
| ASSERT_TRUE(rowset != nullptr); |
| rowsets.push_back(rowset); |
| } |
| |
| compaction.set_input_rowsets(rowsets); |
| |
| // Set a size that will keep first 5 rowsets |
| config::compaction_txn_max_size_bytes = 100; // Small enough to truncate |
| |
| size_t truncated = compaction.test_apply_txn_size_truncation_and_log("base_compaction"); |
| |
| if (truncated > 0) { |
| // Verify that the version range is adjusted correctly |
| ASSERT_GT(compaction.get_input_rowsets().size(), 0); |
| // First rowset should still start at version 10 |
| ASSERT_EQ(compaction.get_input_rowsets().front()->start_version(), 10); |
| // Last rowset version should be less than 19 |
| ASSERT_LT(compaction.get_input_rowsets().back()->end_version(), 20); |
| } |
| } |
| |
| TEST_F(CloudCompactionTest, test_apply_txn_size_truncation_and_log_single_large_rowset) { |
| CloudTabletSPtr tablet = std::make_shared<CloudTablet>(_engine, _tablet_meta); |
| TestableCloudCompaction compaction(_engine, tablet); |
| |
| // Create a single large rowset |
| std::vector<RowsetSharedPtr> rowsets; |
| RowsetSharedPtr rowset = create_rowset(Version(0, 0), 1, false, 1024 * 1024); |
| ASSERT_TRUE(rowset != nullptr); |
| rowsets.push_back(rowset); |
| |
| compaction.set_input_rowsets(rowsets); |
| |
| // Set max size smaller than the rowset's metadata size |
| config::compaction_txn_max_size_bytes = 1; |
| |
| size_t truncated = compaction.test_apply_txn_size_truncation_and_log("cumu_compaction"); |
| |
| // Should keep at least 1 rowset even if it exceeds the limit |
| ASSERT_EQ(truncated, 0); |
| ASSERT_EQ(compaction.get_input_rowsets().size(), 1); |
| } |
| } // namespace doris |