| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <gtest/gtest.h> |
| |
| #include <atomic> |
| #include <memory> |
| #include <optional> |
| #include <string> |
| #include <vector> |
| |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "common/simple_thread_pool.h" |
| #include "recycler/obj_storage_client.h" |
| |
| using namespace doris; |
| |
| namespace doris::cloud { |
| |
| // Mock ObjectListIterator for testing |
| class MockObjectListIterator : public ObjectListIterator { |
| public: |
| MockObjectListIterator(std::vector<ObjectMeta> objects, int fail_after = -1) |
| : objects_(std::move(objects)), fail_after_(fail_after) {} |
| |
| bool is_valid() override { return is_valid_; } |
| |
| bool has_next() override { |
| if (!is_valid_) return false; |
| return current_index_ < objects_.size(); |
| } |
| |
| std::optional<ObjectMeta> next() override { |
| if (!is_valid_ || current_index_ >= objects_.size()) { |
| return std::nullopt; |
| } |
| |
| // Simulate iterator becoming invalid after certain number of calls |
| if (fail_after_ >= 0 && static_cast<int>(current_index_) >= fail_after_) { |
| is_valid_ = false; |
| return std::nullopt; |
| } |
| |
| return objects_[current_index_++]; |
| } |
| |
| void set_invalid() { is_valid_ = false; } |
| |
| private: |
| std::vector<ObjectMeta> objects_; |
| size_t current_index_ = 0; |
| bool is_valid_ = true; |
| int fail_after_ = -1; // -1 means never fail |
| }; |
| |
| // Mock ObjStorageClient for testing delete_objects_recursively_ |
| class MockObjStorageClient : public ObjStorageClient { |
| public: |
| MockObjStorageClient(std::vector<ObjectMeta> objects, int iterator_fail_after = -1) |
| : objects_(std::move(objects)), iterator_fail_after_(iterator_fail_after) {} |
| |
| ObjectStorageResponse put_object(ObjectStoragePathRef path, std::string_view stream) override { |
| return {0}; |
| } |
| |
| ObjectStorageResponse head_object(ObjectStoragePathRef path, ObjectMeta* res) override { |
| return {0}; |
| } |
| |
| std::unique_ptr<ObjectListIterator> list_objects(ObjectStoragePathRef path) override { |
| return std::make_unique<MockObjectListIterator>(objects_, iterator_fail_after_); |
| } |
| |
| ObjectStorageResponse delete_objects(const std::string& bucket, std::vector<std::string> keys, |
| ObjClientOptions option) override { |
| delete_calls_++; |
| total_keys_deleted_ += keys.size(); |
| |
| // Simulate delete failure if configured |
| if (fail_delete_after_ >= 0 && delete_calls_ > fail_delete_after_) { |
| return {-1, "simulated delete failure"}; |
| } |
| |
| return {0}; |
| } |
| |
| ObjectStorageResponse delete_object(ObjectStoragePathRef path) override { return {0}; } |
| |
| ObjectStorageResponse delete_objects_recursively(ObjectStoragePathRef path, |
| ObjClientOptions option, |
| int64_t expiration_time = 0) override { |
| return delete_objects_recursively_(path, option, expiration_time, 1000); |
| } |
| |
| ObjectStorageResponse get_life_cycle(const std::string& bucket, |
| int64_t* expiration_days) override { |
| return {0}; |
| } |
| |
| ObjectStorageResponse check_versioning(const std::string& bucket) override { return {0}; } |
| |
| ObjectStorageResponse abort_multipart_upload(ObjectStoragePathRef path, |
| const std::string& upload_id) override { |
| return {0}; |
| } |
| |
| // Test helper methods |
| int get_delete_calls() const { return delete_calls_; } |
| size_t get_total_keys_deleted() const { return total_keys_deleted_; } |
| void set_fail_delete_after(int n) { fail_delete_after_ = n; } |
| |
| private: |
| std::vector<ObjectMeta> objects_; |
| int iterator_fail_after_ = -1; |
| std::atomic<int> delete_calls_ {0}; |
| std::atomic<size_t> total_keys_deleted_ {0}; |
| int fail_delete_after_ = -1; // -1 means never fail |
| }; |
| |
| class RecyclerBatchDeleteTest : public testing::Test { |
| protected: |
| void SetUp() override { |
| thread_pool_ = std::make_shared<SimpleThreadPool>(4); |
| thread_pool_->start(); |
| } |
| |
| void TearDown() override { |
| if (thread_pool_) { |
| thread_pool_->stop(); |
| } |
| } |
| |
| std::vector<ObjectMeta> generate_objects(size_t count) { |
| std::vector<ObjectMeta> objects; |
| objects.reserve(count); |
| for (size_t i = 0; i < count; ++i) { |
| objects.push_back(ObjectMeta { |
| .key = "test_key_" + std::to_string(i), |
| .size = 100, |
| .mtime_s = 0, |
| }); |
| } |
| return objects; |
| } |
| |
| std::shared_ptr<SimpleThreadPool> thread_pool_; |
| }; |
| |
| // Test 1: Basic batch processing with multiple batches |
| TEST_F(RecyclerBatchDeleteTest, MultipleBatches) { |
| // Save original config and set small batch size for testing |
| int32_t original_config = config::recycler_max_tasks_per_batch; |
| config::recycler_max_tasks_per_batch = 3; // 3 tasks per batch |
| |
| // Create 10 objects, with batch_size=2 (keys per task), max_tasks_per_batch=3 |
| // Expected: 10 objects / 2 keys per task = 5 tasks |
| // 5 tasks / 3 tasks per batch = 2 batches (3 tasks + 2 tasks) |
| auto objects = generate_objects(10); |
| MockObjStorageClient client(objects); |
| |
| ObjClientOptions options; |
| options.executor = thread_pool_; |
| |
| // Use batch_size=2 to create more tasks |
| auto response = client.delete_objects_recursively_( |
| {.bucket = "test_bucket", .key = "test_prefix"}, options, 0, 2); |
| |
| EXPECT_EQ(response.ret, 0); |
| EXPECT_EQ(client.get_delete_calls(), 5); // 10 objects / 2 = 5 delete calls |
| EXPECT_EQ(client.get_total_keys_deleted(), 10); // All 10 keys deleted |
| |
| // Restore config |
| config::recycler_max_tasks_per_batch = original_config; |
| } |
| |
| // Test 2: Iterator becomes invalid during iteration |
| TEST_F(RecyclerBatchDeleteTest, IteratorInvalidMidway) { |
| int32_t original_config = config::recycler_max_tasks_per_batch; |
| config::recycler_max_tasks_per_batch = 100; |
| |
| // Create 20 objects but iterator fails after 10 |
| auto objects = generate_objects(20); |
| MockObjStorageClient client(objects, 10); // fail_after=10 |
| |
| ObjClientOptions options; |
| options.executor = thread_pool_; |
| |
| auto response = client.delete_objects_recursively_( |
| {.bucket = "test_bucket", .key = "test_prefix"}, options, 0, 5); |
| |
| // Should return error because iterator became invalid |
| EXPECT_EQ(response.ret, -1); |
| // Should have processed some objects before failure |
| EXPECT_GT(client.get_total_keys_deleted(), 0); |
| EXPECT_LT(client.get_total_keys_deleted(), 20); |
| |
| config::recycler_max_tasks_per_batch = original_config; |
| } |
| |
| // Test 3: Delete operation fails (triggers cancel) |
| TEST_F(RecyclerBatchDeleteTest, DeleteFailureTriggersCancel) { |
| int32_t original_config = config::recycler_max_tasks_per_batch; |
| config::recycler_max_tasks_per_batch = 10; |
| |
| auto objects = generate_objects(30); |
| MockObjStorageClient client(objects); |
| client.set_fail_delete_after(2); // Fail after 2 successful deletes |
| |
| ObjClientOptions options; |
| options.executor = thread_pool_; |
| |
| auto response = client.delete_objects_recursively_( |
| {.bucket = "test_bucket", .key = "test_prefix"}, options, 0, 5); |
| |
| // Should return error because delete failed |
| EXPECT_EQ(response.ret, -1); |
| |
| config::recycler_max_tasks_per_batch = original_config; |
| } |
| |
| // Test 4: Empty object list |
| TEST_F(RecyclerBatchDeleteTest, EmptyObjectList) { |
| int32_t original_config = config::recycler_max_tasks_per_batch; |
| config::recycler_max_tasks_per_batch = 100; |
| |
| std::vector<ObjectMeta> empty_objects; |
| MockObjStorageClient client(empty_objects); |
| |
| ObjClientOptions options; |
| options.executor = thread_pool_; |
| |
| auto response = client.delete_objects_recursively_( |
| {.bucket = "test_bucket", .key = "test_prefix"}, options, 0, 1000); |
| |
| EXPECT_EQ(response.ret, 0); |
| EXPECT_EQ(client.get_delete_calls(), 0); |
| EXPECT_EQ(client.get_total_keys_deleted(), 0); |
| |
| config::recycler_max_tasks_per_batch = original_config; |
| } |
| |
| // Test 5: Objects less than batch_size |
| TEST_F(RecyclerBatchDeleteTest, ObjectsLessThanBatchSize) { |
| int32_t original_config = config::recycler_max_tasks_per_batch; |
| config::recycler_max_tasks_per_batch = 100; |
| |
| auto objects = generate_objects(5); |
| MockObjStorageClient client(objects); |
| |
| ObjClientOptions options; |
| options.executor = thread_pool_; |
| |
| // batch_size=1000, but only 5 objects |
| auto response = client.delete_objects_recursively_( |
| {.bucket = "test_bucket", .key = "test_prefix"}, options, 0, 1000); |
| |
| EXPECT_EQ(response.ret, 0); |
| EXPECT_EQ(client.get_delete_calls(), 1); // All 5 keys in one delete call |
| EXPECT_EQ(client.get_total_keys_deleted(), 5); |
| |
| config::recycler_max_tasks_per_batch = original_config; |
| } |
| |
| // Test 6: Exact batch boundary |
| TEST_F(RecyclerBatchDeleteTest, ExactBatchBoundary) { |
| int32_t original_config = config::recycler_max_tasks_per_batch; |
| config::recycler_max_tasks_per_batch = 2; // 2 tasks per batch |
| |
| // 8 objects with batch_size=2 = 4 tasks |
| // 4 tasks with max_tasks_per_batch=2 = exactly 2 batches |
| auto objects = generate_objects(8); |
| MockObjStorageClient client(objects); |
| |
| ObjClientOptions options; |
| options.executor = thread_pool_; |
| |
| auto response = client.delete_objects_recursively_( |
| {.bucket = "test_bucket", .key = "test_prefix"}, options, 0, 2); |
| |
| EXPECT_EQ(response.ret, 0); |
| EXPECT_EQ(client.get_delete_calls(), 4); // 8 / 2 = 4 tasks |
| EXPECT_EQ(client.get_total_keys_deleted(), 8); |
| |
| config::recycler_max_tasks_per_batch = original_config; |
| } |
| |
| // Test 7: Invalid config value (negative) |
| TEST_F(RecyclerBatchDeleteTest, InvalidConfigNegative) { |
| int32_t original_config = config::recycler_max_tasks_per_batch; |
| config::recycler_max_tasks_per_batch = -1; // Invalid negative value |
| |
| auto objects = generate_objects(10); |
| MockObjStorageClient client(objects); |
| |
| ObjClientOptions options; |
| options.executor = thread_pool_; |
| |
| // Should use default value 1000 and still work |
| auto response = client.delete_objects_recursively_( |
| {.bucket = "test_bucket", .key = "test_prefix"}, options, 0, 5); |
| |
| EXPECT_EQ(response.ret, 0); |
| EXPECT_EQ(client.get_total_keys_deleted(), 10); |
| |
| config::recycler_max_tasks_per_batch = original_config; |
| } |
| |
| // Test 8: Invalid config value (zero) |
| TEST_F(RecyclerBatchDeleteTest, InvalidConfigZero) { |
| int32_t original_config = config::recycler_max_tasks_per_batch; |
| config::recycler_max_tasks_per_batch = 0; // Invalid zero value |
| |
| auto objects = generate_objects(10); |
| MockObjStorageClient client(objects); |
| |
| ObjClientOptions options; |
| options.executor = thread_pool_; |
| |
| // Should use default value 1000 and still work |
| auto response = client.delete_objects_recursively_( |
| {.bucket = "test_bucket", .key = "test_prefix"}, options, 0, 5); |
| |
| EXPECT_EQ(response.ret, 0); |
| EXPECT_EQ(client.get_total_keys_deleted(), 10); |
| |
| config::recycler_max_tasks_per_batch = original_config; |
| } |
| |
| // Test 9: Expiration time filtering |
| TEST_F(RecyclerBatchDeleteTest, ExpirationTimeFiltering) { |
| int32_t original_config = config::recycler_max_tasks_per_batch; |
| config::recycler_max_tasks_per_batch = 100; |
| |
| std::vector<ObjectMeta> objects; |
| // Create 10 objects: 5 with old mtime (should be deleted), 5 with new mtime (should be kept) |
| for (int i = 0; i < 5; ++i) { |
| objects.push_back(ObjectMeta { |
| .key = "old_key_" + std::to_string(i), |
| .size = 100, |
| .mtime_s = 100, // Old timestamp |
| }); |
| } |
| for (int i = 0; i < 5; ++i) { |
| objects.push_back(ObjectMeta { |
| .key = "new_key_" + std::to_string(i), |
| .size = 100, |
| .mtime_s = 1000, // New timestamp |
| }); |
| } |
| |
| MockObjStorageClient client(objects); |
| |
| ObjClientOptions options; |
| options.executor = thread_pool_; |
| |
| // Set expiration_time=500, so only objects with mtime_s <= 500 should be deleted |
| auto response = client.delete_objects_recursively_( |
| {.bucket = "test_bucket", .key = "test_prefix"}, options, 500, 1000); |
| |
| EXPECT_EQ(response.ret, 0); |
| EXPECT_EQ(client.get_total_keys_deleted(), 5); // Only old objects deleted |
| |
| config::recycler_max_tasks_per_batch = original_config; |
| } |
| |
| // Test 10: Iterator invalid at start (empty batch scenario) |
| TEST_F(RecyclerBatchDeleteTest, IteratorInvalidAtStart) { |
| int32_t original_config = config::recycler_max_tasks_per_batch; |
| config::recycler_max_tasks_per_batch = 100; |
| |
| // Iterator fails immediately (fail_after=0) |
| auto objects = generate_objects(10); |
| MockObjStorageClient client(objects, 0); |
| |
| ObjClientOptions options; |
| options.executor = thread_pool_; |
| |
| auto response = client.delete_objects_recursively_( |
| {.bucket = "test_bucket", .key = "test_prefix"}, options, 0, 5); |
| |
| // Should return error because iterator was invalid from the start |
| EXPECT_EQ(response.ret, -1); |
| EXPECT_EQ(client.get_delete_calls(), 0); |
| |
| config::recycler_max_tasks_per_batch = original_config; |
| } |
| |
| } // namespace doris::cloud |