blob: 55380be95987d210753263cfbcc99031ee86bd3a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "io/fs/s3_file_system.h"
#include <aws/s3/S3Client.h>
#include <aws/s3/model/DeleteObjectsResult.h>
#include <aws/s3/model/Error.h>
#include <gtest/gtest.h>
#include <algorithm>
#include <cstdlib>
#include <ios>
#include <memory>
#include <string>
#include "common/config.h"
#include "cpp/sync_point.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "io/fs/obj_storage_client.h"
#include "runtime/exec_env.h"
#include "util/s3_util.h"
namespace doris {
#define GET_ENV_IF_DEFINED(var) \
([]() -> std::string { \
const char* val = std::getenv(#var); \
return val ? std::string(val) : ""; \
}())
static const char* MOCK_S3_FS_ID = "mock_s3_fs_id_6";
class S3TestConfig {
public:
S3TestConfig() {
// Check if S3 client is enabled
enabled = (GET_ENV_IF_DEFINED(ENABLE_S3_CLIENT) == "1");
if (!enabled) {
return;
}
access_key = GET_ENV_IF_DEFINED(S3_AK);
secret_key = GET_ENV_IF_DEFINED(S3_SK);
endpoint = GET_ENV_IF_DEFINED(S3_ENDPOINT);
provider = GET_ENV_IF_DEFINED(S3_PROVIDER);
bucket = GET_ENV_IF_DEFINED(S3_BUCKET);
region = GET_ENV_IF_DEFINED(S3_REGION);
prefix = GET_ENV_IF_DEFINED(S3_PREFIX);
}
bool is_enabled() const { return enabled; }
bool is_valid() const {
return enabled && !access_key.empty() && !secret_key.empty() && !endpoint.empty() &&
!region.empty() && !bucket.empty();
}
std::string get_access_key() const { return access_key; }
std::string get_secret_key() const { return secret_key; }
std::string get_endpoint() const { return endpoint; }
std::string get_provider() const { return provider; }
std::string get_bucket() const { return bucket; }
std::string get_prefix() const { return prefix; }
std::string get_region() const { return region; }
void print_config() const {
std::cout << "S3 Test Configuration:" << std::endl;
std::cout << " Enabled: " << (enabled ? "Yes" : "No") << std::endl;
if (enabled) {
std::cout << " Access Key: " << (access_key.empty() ? "<empty>" : "<set>")
<< std::endl;
std::cout << " Secret Key: " << (secret_key.empty() ? "<empty>" : "<hidden>")
<< std::endl;
std::cout << " Endpoint: " << endpoint << std::endl;
std::cout << " Provider: " << provider << std::endl;
std::cout << " Bucket: " << bucket << std::endl;
std::cout << " Region: " << region << std::endl;
std::cout << " Prefix: " << prefix << std::endl;
}
}
private:
bool enabled = false;
std::string access_key;
std::string secret_key;
std::string endpoint;
std::string provider;
std::string bucket;
std::string region;
std::string prefix;
};
class S3FileSystemTest : public ::testing::Test {
protected:
void SetUp() override {
config_ = std::make_shared<S3TestConfig>();
// Print configuration for debugging (always print for S3 tests)
config_->print_config();
// Skip tests if S3 is not enabled or not configured properly
if (!config_->is_enabled()) {
GTEST_SKIP() << "S3 client is not enabled. Use --enable_s3_client flag to enable.";
}
if (!config_->is_valid()) {
GTEST_SKIP() << "S3 configuration is incomplete. Required: AK, SK, ENDPOINT, BUCKET.";
}
// Attempt to create S3 client
if (auto st = create_client(); !st.ok()) {
GTEST_SKIP() << "Failed to create S3 client with provided configuration."
<< st.to_string();
}
std::unique_ptr<ThreadPool> _pool;
std::ignore = ThreadPoolBuilder("s3_upload_file_thread_pool")
.set_min_threads(5)
.set_max_threads(10)
.build(&_pool);
ExecEnv::GetInstance()->_s3_file_upload_thread_pool = std::move(_pool);
std::string test_path = "s3_fs_test_dir";
global_test_prefix_ = get_unique_test_path(test_path);
auto status = s3_fs_->delete_directory(global_test_prefix_);
EXPECT_TRUE(status.ok()) << "Failed to delete test file: " << status.to_string();
}
io::ObjStorageType convert_provider(const std::string& provider_str) {
if (provider_str == "AZURE") {
return io::ObjStorageType::AZURE;
} else {
return io::ObjStorageType::AWS; // Default to AWS S3
}
}
Status create_client() {
S3Conf s3_conf;
s3_conf.bucket = config_->get_bucket();
s3_conf.prefix = config_->get_prefix();
s3_conf.client_conf.ak = config_->get_access_key();
s3_conf.client_conf.sk = config_->get_secret_key();
s3_conf.client_conf.endpoint = config_->get_endpoint();
s3_conf.client_conf.bucket = config_->get_bucket();
s3_conf.client_conf.region = config_->get_region();
s3_conf.client_conf.provider = convert_provider(config_->get_provider());
s3_fs_ = DORIS_TRY(io::S3FileSystem::create(s3_conf, MOCK_S3_FS_ID));
return Status::OK();
}
void TearDown() override {
// Cleanup resources
ExecEnv::GetInstance()->_s3_file_upload_thread_pool.reset();
if (s3_fs_) {
auto status = s3_fs_->delete_directory(global_test_prefix_);
EXPECT_TRUE(status.ok()) << "Failed to delete test file: " << status.to_string();
}
}
std::string get_unique_test_path(const std::string& test_name) {
std::string path = config_->get_prefix();
if (!path.empty() && path.back() != '/') {
path += '/';
}
path += "ut_" + test_name + "/";
return path;
}
std::shared_ptr<S3TestConfig> config_;
std::shared_ptr<io::S3FileSystem> s3_fs_;
std::string global_test_prefix_;
};
// Test: Simple put object test
TEST_F(S3FileSystemTest, SimplePutObjectTest) {
ASSERT_NE(s3_fs_, nullptr);
// Generate a unique test file path
std::string test_file = global_test_prefix_ + "test_file.txt";
std::cout << "Test file path: " << test_file << std::endl;
// Create file writer
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(test_file, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file: " << status.to_string();
ASSERT_NE(writer, nullptr);
// Write test data
std::string test_data = "Hello, S3! This is a simple test.";
status = writer->append(test_data);
ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
// Close the writer
status = writer->close();
ASSERT_TRUE(status.ok()) << "Failed to close writer: " << status.to_string();
std::cout << "Successfully wrote " << test_data.size() << " bytes to S3" << std::endl;
// Verify file exists
bool exists = false;
status = s3_fs_->exists(test_file, &exists);
ASSERT_TRUE(status.ok()) << "Failed to check file existence: " << status.to_string();
EXPECT_TRUE(exists) << "File should exist after writing";
// Verify file size
int64_t file_size = 0;
status = s3_fs_->file_size(test_file, &file_size);
ASSERT_TRUE(status.ok()) << "Failed to get file size: " << status.to_string();
EXPECT_EQ(file_size, test_data.size()) << "File size mismatch";
std::cout << "File size: " << file_size << " bytes" << std::endl;
std::cout << "Test completed successfully!" << std::endl;
// Cleanup test file
status = s3_fs_->delete_file(test_file);
ASSERT_TRUE(status.ok()) << "Failed to delete test file: " << status.to_string();
}
// Test: Large file with multipart upload
TEST_F(S3FileSystemTest, MultipartUploadTest) {
ASSERT_NE(s3_fs_, nullptr);
std::string test_file = global_test_prefix_ + "large_file.dat";
std::cout << "Test file path: " << test_file << std::endl;
// Create file writer
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(test_file, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file: " << status.to_string();
ASSERT_NE(writer, nullptr);
// Write large data to trigger multipart upload (>5MB for multipart)
size_t chunk_size = 1024 * 1024; // 1MB
size_t num_chunks = 6; // 6MB total
std::string chunk_data(chunk_size, 'A');
for (size_t i = 0; i < num_chunks; ++i) {
// Vary the data pattern for each chunk
std::ranges::fill(chunk_data, 'A' + (i % 26));
status = writer->append(chunk_data);
ASSERT_TRUE(status.ok()) << "Failed to write chunk " << i << ": " << status.to_string();
}
// Close the writer to complete multipart upload
status = writer->close();
ASSERT_TRUE(status.ok()) << "Failed to close writer: " << status.to_string();
std::cout << "Successfully uploaded " << (chunk_size * num_chunks)
<< " bytes using multipart upload" << std::endl;
// Verify file exists and has correct size
bool exists = false;
status = s3_fs_->exists(test_file, &exists);
ASSERT_TRUE(status.ok()) << "Failed to check file existence: " << status.to_string();
EXPECT_TRUE(exists) << "File should exist after multipart upload";
int64_t file_size = 0;
status = s3_fs_->file_size(test_file, &file_size);
ASSERT_TRUE(status.ok()) << "Failed to get file size: " << status.to_string();
EXPECT_EQ(file_size, chunk_size * num_chunks) << "File size mismatch";
std::cout << "Multipart upload test completed successfully!" << std::endl;
// Cleanup
status = s3_fs_->delete_file(test_file);
ASSERT_TRUE(status.ok()) << "Failed to delete test file: " << status.to_string();
}
// Test: List objects in directory
TEST_F(S3FileSystemTest, ListObjectsTest) {
ASSERT_NE(s3_fs_, nullptr);
std::string test_dir = global_test_prefix_ + "list_test_dir/";
std::cout << "Test directory path: " << test_dir << std::endl;
// Create multiple test files
std::vector<std::string> test_files = {
test_dir + "file1.txt",
test_dir + "file2.txt",
test_dir + "subdir/file3.txt",
};
for (const auto& file_path : test_files) {
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(file_path, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file: " << status.to_string();
std::string data = "Test data for " + file_path;
status = writer->append(data);
ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
status = writer->close();
ASSERT_TRUE(status.ok()) << "Failed to close writer: " << status.to_string();
}
// List files in directory
bool exists = false;
std::vector<io::FileInfo> files;
Status status = s3_fs_->list(test_dir, true, &files, &exists);
ASSERT_TRUE(status.ok()) << "Failed to list files: " << status.to_string();
EXPECT_TRUE(exists);
std::cout << "Found " << files.size() << " files" << std::endl;
for (const auto& file : files) {
std::cout << " - " << file.file_name << " (" << file.file_size << " bytes)" << std::endl;
}
// Verify we got all files
EXPECT_GE(files.size(), test_files.size()) << "Should list all created files";
std::cout << "List objects test completed successfully!" << std::endl;
// Cleanup
status = s3_fs_->delete_directory(test_dir);
ASSERT_TRUE(status.ok()) << "Failed to delete test directory: " << status.to_string();
}
// Test: Batch delete files
TEST_F(S3FileSystemTest, BatchDeleteTest) {
ASSERT_NE(s3_fs_, nullptr);
std::string test_dir = global_test_prefix_ + "batch_delete_test/";
std::cout << "Test directory path: " << test_dir << std::endl;
// Create multiple test files
std::vector<std::string> test_files;
for (int i = 0; i < 5; ++i) {
std::string file_path = test_dir + "file_" + std::to_string(i) + ".txt";
test_files.push_back(file_path);
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(file_path, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file: " << status.to_string();
std::string data = "Test data " + std::to_string(i);
status = writer->append(data);
ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
status = writer->close();
ASSERT_TRUE(status.ok()) << "Failed to close writer: " << status.to_string();
}
// Verify all files exist
for (const auto& file_path : test_files) {
bool exists = false;
Status status = s3_fs_->exists(file_path, &exists);
ASSERT_TRUE(status.ok()) << "Failed to check file existence: " << status.to_string();
EXPECT_TRUE(exists) << "File should exist: " << file_path;
}
// Batch delete files
std::vector<io::Path> paths_to_delete;
for (const auto& file_path : test_files) {
paths_to_delete.emplace_back(file_path);
}
Status status = s3_fs_->batch_delete(paths_to_delete);
ASSERT_TRUE(status.ok()) << "Failed to batch delete files: " << status.to_string();
std::cout << "Successfully batch deleted " << paths_to_delete.size() << " files" << std::endl;
// Verify all files are deleted
for (const auto& file_path : test_files) {
bool exists = false;
status = s3_fs_->exists(file_path, &exists);
ASSERT_TRUE(status.ok()) << "Failed to check file existence: " << status.to_string();
EXPECT_FALSE(exists) << "File should not exist after deletion: " << file_path;
}
std::cout << "Batch delete test completed successfully!" << std::endl;
}
// Test: Delete directory recursively
TEST_F(S3FileSystemTest, DeleteDirectoryRecursivelyTest) {
ASSERT_NE(s3_fs_, nullptr);
std::string test_dir = global_test_prefix_ + "recursive_delete_test/";
std::cout << "Test directory path: " << test_dir << std::endl;
// Create nested directory structure with files
std::vector<std::string> test_files = {
test_dir + "file1.txt",
test_dir + "file2.txt",
test_dir + "subdir1/file3.txt",
test_dir + "subdir1/file4.txt",
test_dir + "subdir2/nested/file5.txt",
};
for (const auto& file_path : test_files) {
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(file_path, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file: " << status.to_string();
std::string data = "Test data for " + file_path;
status = writer->append(data);
ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
status = writer->close();
ASSERT_TRUE(status.ok()) << "Failed to close writer: " << status.to_string();
}
// Verify files exist
for (const auto& file_path : test_files) {
bool exists = false;
Status status = s3_fs_->exists(file_path, &exists);
ASSERT_TRUE(status.ok()) << "Failed to check file existence: " << status.to_string();
EXPECT_TRUE(exists) << "File should exist: " << file_path;
}
// Delete directory recursively
Status status = s3_fs_->delete_directory(test_dir);
ASSERT_TRUE(status.ok()) << "Failed to delete directory recursively: " << status.to_string();
std::cout << "Successfully deleted directory recursively" << std::endl;
// Verify all files are deleted
for (const auto& file_path : test_files) {
bool exists = false;
status = s3_fs_->exists(file_path, &exists);
ASSERT_TRUE(status.ok()) << "Failed to check file existence: " << status.to_string();
EXPECT_FALSE(exists) << "File should not exist after recursive deletion: " << file_path;
}
std::cout << "Delete directory recursively test completed successfully!" << std::endl;
}
// Test: Upload and download local file
TEST_F(S3FileSystemTest, UploadDownloadTest) {
ASSERT_NE(s3_fs_, nullptr);
// Create a temporary local file
std::string local_file = "/tmp/test_upload_file.txt";
std::string test_content = "This is test content for upload/download test.\nLine 2\nLine 3";
std::ofstream ofs(local_file);
ASSERT_TRUE(ofs.is_open()) << "Failed to create local file";
ofs << test_content;
ofs.close();
std::string remote_file = global_test_prefix_ + "uploaded_file.txt";
std::cout << "Remote file path: " << remote_file << std::endl;
// Upload local file to S3
Status status = s3_fs_->upload(local_file, remote_file);
ASSERT_TRUE(status.ok()) << "Failed to upload file: " << status.to_string();
std::cout << "Successfully uploaded file" << std::endl;
// Verify file exists on S3
bool exists = false;
status = s3_fs_->exists(remote_file, &exists);
ASSERT_TRUE(status.ok()) << "Failed to check file existence: " << status.to_string();
EXPECT_TRUE(exists) << "Uploaded file should exist";
// Download file from S3
std::string download_file = "/tmp/test_download_file.txt";
status = s3_fs_->download(remote_file, download_file);
ASSERT_TRUE(status.ok()) << "Failed to download file: " << status.to_string();
std::cout << "Successfully downloaded file" << std::endl;
// Verify downloaded content matches original
std::ifstream ifs(download_file);
ASSERT_TRUE(ifs.is_open()) << "Failed to open downloaded file";
std::string downloaded_content((std::istreambuf_iterator<char>(ifs)),
std::istreambuf_iterator<char>());
ifs.close();
EXPECT_EQ(test_content, downloaded_content) << "Downloaded content should match original";
std::cout << "Upload/download test completed successfully!" << std::endl;
// Cleanup
std::remove(local_file.c_str());
std::remove(download_file.c_str());
status = s3_fs_->delete_file(remote_file);
ASSERT_TRUE(status.ok()) << "Failed to delete remote file: " << status.to_string();
}
// Test: Open file and read content
TEST_F(S3FileSystemTest, OpenFileAndReadTest) {
ASSERT_NE(s3_fs_, nullptr);
std::string test_file = global_test_prefix_ + "read_test_file.txt";
std::cout << "Test file path: " << test_file << std::endl;
// Create and write test file
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(test_file, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file: " << status.to_string();
std::string test_data =
"Hello, S3 Read Test! This is line 1.\nThis is line 2.\nThis is line 3.";
status = writer->append(test_data);
ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
status = writer->close();
ASSERT_TRUE(status.ok()) << "Failed to close writer: " << status.to_string();
// Open file for reading
io::FileReaderSPtr reader;
status = s3_fs_->open_file(test_file, &reader);
ASSERT_TRUE(status.ok()) << "Failed to open file: " << status.to_string();
ASSERT_NE(reader, nullptr);
// Read entire file
size_t file_size = reader->size();
EXPECT_EQ(file_size, test_data.size()) << "File size should match written data";
std::vector<char> buffer(file_size);
size_t bytes_read = 0;
status = reader->read_at(0, Slice(buffer.data(), file_size), &bytes_read);
ASSERT_TRUE(status.ok()) << "Failed to read file: " << status.to_string();
EXPECT_EQ(bytes_read, file_size) << "Should read entire file";
std::string read_data(buffer.data(), bytes_read);
EXPECT_EQ(read_data, test_data) << "Read data should match written data";
std::cout << "Successfully read " << bytes_read << " bytes from S3" << std::endl;
// Test partial read
size_t offset = 7;
size_t read_size = 15;
std::vector<char> partial_buffer(read_size);
bytes_read = 0;
status = reader->read_at(offset, Slice(partial_buffer.data(), read_size), &bytes_read);
ASSERT_TRUE(status.ok()) << "Failed to read file partially: " << status.to_string();
EXPECT_EQ(bytes_read, read_size) << "Should read requested bytes";
std::string partial_data(partial_buffer.data(), bytes_read);
std::string expected_partial = test_data.substr(offset, read_size);
EXPECT_EQ(partial_data, expected_partial) << "Partial read data should match";
std::cout << "Open file and read test completed successfully!" << std::endl;
// Cleanup
status = s3_fs_->delete_file(test_file);
ASSERT_TRUE(status.ok()) << "Failed to delete test file: " << status.to_string();
}
// Test: Generate presigned URL
TEST_F(S3FileSystemTest, GeneratePresignedUrlTest) {
ASSERT_NE(s3_fs_, nullptr);
std::string test_file = global_test_prefix_ + "presigned_url_test.txt";
std::cout << "Test file path: " << test_file << std::endl;
// Create test file
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(test_file, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file: " << status.to_string();
std::string test_data = "Test data for presigned URL";
status = writer->append(test_data);
ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
status = writer->close();
ASSERT_TRUE(status.ok()) << "Failed to close writer: " << status.to_string();
// Generate presigned URL
int64_t expiration_secs = 3600; // 1 hour
std::string presigned_url = s3_fs_->generate_presigned_url(test_file, expiration_secs, false);
std::cout << "Generated presigned URL: " << presigned_url << std::endl;
// Verify URL is not empty and contains expected components
EXPECT_FALSE(presigned_url.empty()) << "Presigned URL should not be empty";
EXPECT_NE(presigned_url.find("http"), std::string::npos) << "URL should start with http";
// For public endpoint test (if using OSS)
if (config_->get_provider() == "OSS") {
std::string presigned_url_public =
s3_fs_->generate_presigned_url(test_file, expiration_secs, true);
std::cout << "Generated presigned URL (public): " << presigned_url_public << std::endl;
EXPECT_FALSE(presigned_url_public.empty()) << "Public presigned URL should not be empty";
}
std::cout << "Generate presigned URL test completed successfully!" << std::endl;
// Cleanup
status = s3_fs_->delete_file(test_file);
ASSERT_TRUE(status.ok()) << "Failed to delete test file: " << status.to_string();
}
// Test: Create directory (should be no-op but succeed)
TEST_F(S3FileSystemTest, CreateDirectoryTest) {
ASSERT_NE(s3_fs_, nullptr);
std::string test_dir = global_test_prefix_ + "test_directory/";
// Create directory (should succeed but is essentially a no-op for S3)
Status status = s3_fs_->create_directory(test_dir);
ASSERT_TRUE(status.ok()) << "Failed to create directory: " << status.to_string();
std::cout << "Create directory test completed successfully!" << std::endl;
}
// Test: File size operation with head_object
TEST_F(S3FileSystemTest, FileSizeHeadObjectTest) {
ASSERT_NE(s3_fs_, nullptr);
std::string test_file = global_test_prefix_ + "size_test_file.txt";
// Create file with known size
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(test_file, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file: " << status.to_string();
std::string test_data = "This is a test string with known length: 12345";
size_t expected_size = test_data.size();
status = writer->append(test_data);
ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
status = writer->close();
ASSERT_TRUE(status.ok()) << "Failed to close writer: " << status.to_string();
// Get file size using head_object
int64_t file_size = 0;
status = s3_fs_->file_size(test_file, &file_size);
ASSERT_TRUE(status.ok()) << "Failed to get file size: " << status.to_string();
EXPECT_EQ(file_size, expected_size) << "File size should match written data";
std::cout << "File size (head_object) test completed successfully!" << std::endl;
// Cleanup
status = s3_fs_->delete_file(test_file);
ASSERT_TRUE(status.ok()) << "Failed to delete test file: " << status.to_string();
}
// Test: Exists check for non-existent file
TEST_F(S3FileSystemTest, ExistsNonExistentFileTest) {
ASSERT_NE(s3_fs_, nullptr);
std::string non_existent_file = global_test_prefix_ + "non_existent_file_12345.txt";
bool exists = true;
Status status = s3_fs_->exists(non_existent_file, &exists);
ASSERT_TRUE(status.ok()) << "Failed to check file existence: " << status.to_string();
EXPECT_FALSE(exists) << "Non-existent file should not exist";
std::cout << "Exists non-existent file test completed successfully!" << std::endl;
}
// ==================== Rate Limiter Tests ====================
// Test: S3 rate limiter for GET operations - open_file and read_at
TEST_F(S3FileSystemTest, RateLimiterGetTest) {
ASSERT_NE(s3_fs_, nullptr);
// Save original config value
bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
// Enable S3 rate limiter for this test
config::enable_s3_rate_limiter = true;
// Create a test file first
std::string test_file = global_test_prefix_ + "rate_limit_get_test.txt";
std::cout << "Test file path: " << test_file << std::endl;
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(test_file, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file: " << status.to_string();
// Write some data (about 1KB)
std::string test_data(1024, 'A');
status = writer->append(test_data);
ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
status = writer->close();
ASSERT_TRUE(status.ok()) << "Failed to close writer: " << status.to_string();
// Set a very strict rate limit for GET operations to trigger error
// max_speed: 10 tokens per second
// max_burst: 10 tokens (bucket size)
// limit: 5 (allow only 5 requests total, the 6th will fail)
int ret = reset_s3_rate_limiter(S3RateLimitType::GET, 10, 10, 11);
ASSERT_EQ(ret, 0) << "Failed to set rate limiter for GET operations";
std::cout << "Rate limiter set: limit 5 total requests for GET operations" << std::endl;
// First 5 read operations should succeed
for (int i = 0; i < 5; ++i) {
io::FileReaderSPtr reader;
status = s3_fs_->open_file(test_file, &reader);
ASSERT_TRUE(status.ok()) << "Failed to open file on attempt " << i + 1 << ": "
<< status.to_string();
std::vector<char> buffer(1024);
size_t bytes_read = 0;
status = reader->read_at(0, Slice(buffer.data(), 1024), &bytes_read);
ASSERT_TRUE(status.ok()) << "Failed to read file on attempt " << i + 1 << ": "
<< status.to_string();
std::cout << "Read attempt " << i + 1 << " succeeded" << std::endl;
}
// 6th read operation should fail due to rate limit
io::FileReaderSPtr reader;
status = s3_fs_->open_file(test_file, &reader);
if (status.ok()) {
std::vector<char> buffer(1024);
size_t bytes_read = 0;
status = reader->read_at(0, Slice(buffer.data(), 1024), &bytes_read);
}
EXPECT_FALSE(status.ok()) << "6th read should fail due to rate limit";
std::cout << "6th read failed as expected: " << status.to_string() << std::endl;
// Reset rate limiter to default (no limit) to avoid affecting other tests
reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
// Restore original config
config::enable_s3_rate_limiter = original_enable_rate_limiter;
std::cout << "Rate limiter GET test completed successfully!" << std::endl;
// Cleanup
status = s3_fs_->delete_file(test_file);
ASSERT_TRUE(status.ok()) << "Failed to delete test file: " << status.to_string();
}
// Test: S3 rate limiter for PUT operations - trigger error with limit
TEST_F(S3FileSystemTest, RateLimiterPutTest) {
ASSERT_NE(s3_fs_, nullptr);
// Save original config value
bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
// Enable S3 rate limiter for this test
config::enable_s3_rate_limiter = true;
// Set a very strict rate limit for PUT operations to trigger error
// max_speed: 1 token per second (1 QPS)
// max_burst: 1 token (bucket size)
// limit: 2 (allow only 2 requests total, the 3rd will fail)
int ret = reset_s3_rate_limiter(S3RateLimitType::PUT, 10, 10, 2);
ASSERT_EQ(ret, 0) << "Failed to set rate limiter for PUT operations";
std::cout << "Rate limiter set: limit 2 total requests for PUT operations" << std::endl;
// First two write operations should succeed
std::vector<std::string> test_files;
Status status;
for (int i = 0; i < 2; ++i) {
std::string test_file =
global_test_prefix_ + "rate_limit_put_test_" + std::to_string(i) + ".txt";
test_files.push_back(test_file);
io::FileWriterPtr writer;
status = s3_fs_->create_file(test_file, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file on attempt " << i + 1 << ": "
<< status.to_string();
std::string test_data = "Test data " + std::to_string(i);
status = writer->append(test_data);
ASSERT_TRUE(status.ok()) << "Failed to write data on attempt " << i + 1 << ": "
<< status.to_string();
status = writer->close();
ASSERT_TRUE(status.ok()) << "Failed to close writer on attempt " << i + 1 << ": "
<< status.to_string();
std::cout << "Write attempt " << i + 1 << " succeeded" << std::endl;
}
// Third write operation should fail due to rate limit
std::string test_file_fail = global_test_prefix_ + "rate_limit_put_test_fail.txt";
io::FileWriterPtr writer;
status = s3_fs_->create_file(test_file_fail, &writer);
if (status.ok()) {
std::string test_data = "This should fail";
status = writer->append(test_data);
if (status.ok()) {
status = writer->close();
}
}
EXPECT_FALSE(status.ok()) << "Third write should fail due to rate limit";
std::cout << "Third write failed as expected: " << status.to_string() << std::endl;
// Reset rate limiter to default (no limit) to avoid affecting other tests
reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
// Restore original config
config::enable_s3_rate_limiter = original_enable_rate_limiter;
std::cout << "Rate limiter PUT test completed successfully!" << std::endl;
// Cleanup successfully created files
for (const auto& file : test_files) {
status = s3_fs_->delete_file(file);
ASSERT_TRUE(status.ok()) << "Failed to delete test file: " << status.to_string();
}
}
// Test: S3 rate limiter for GET operations - download
TEST_F(S3FileSystemTest, RateLimiterGetDownloadTest) {
ASSERT_NE(s3_fs_, nullptr);
// Save original config value
bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
// Enable S3 rate limiter for this test
config::enable_s3_rate_limiter = true;
// Create test files first
std::vector<std::string> test_files;
for (int i = 0; i < 3; ++i) {
std::string test_file =
global_test_prefix_ + "rate_limit_download_test_" + std::to_string(i) + ".txt";
test_files.push_back(test_file);
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(test_file, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file: " << status.to_string();
std::string test_data = "Download test data " + std::to_string(i);
status = writer->append(test_data);
ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
status = writer->close();
ASSERT_TRUE(status.ok()) << "Failed to close writer: " << status.to_string();
}
// Set rate limit for GET operations
// limit: 2 (allow only 2 requests total, the 3rd will fail)
int ret = reset_s3_rate_limiter(S3RateLimitType::GET, 10, 10, 4);
ASSERT_EQ(ret, 0) << "Failed to set rate limiter for GET operations";
std::cout << "Rate limiter set: limit 2 total requests for GET operations (download)"
<< std::endl;
// First two download operations should succeed
Status status;
for (int i = 0; i < 2; ++i) {
std::string local_file = "/tmp/rate_limit_download_" + std::to_string(i) + ".txt";
status = s3_fs_->download(test_files[i], local_file);
ASSERT_TRUE(status.ok()) << "Failed to download file on attempt " << i + 1 << ": "
<< status.to_string();
std::cout << "Download attempt " << i + 1 << " succeeded" << std::endl;
std::remove(local_file.c_str());
}
// Third download operation should fail due to rate limit
std::string local_file_fail = "/tmp/rate_limit_download_fail.txt";
status = s3_fs_->download(test_files[2], local_file_fail);
EXPECT_FALSE(status.ok()) << "Third download should fail due to rate limit";
std::cout << "Third download failed as expected: " << status.to_string() << std::endl;
// Reset rate limiter
reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
// Restore original config
config::enable_s3_rate_limiter = original_enable_rate_limiter;
std::cout << "Rate limiter GET download test completed successfully!" << std::endl;
// Cleanup
for (const auto& file : test_files) {
status = s3_fs_->delete_file(file);
ASSERT_TRUE(status.ok()) << "Failed to delete test file: " << status.to_string();
}
}
// Test: S3 rate limiter for PUT operations - multipart upload
TEST_F(S3FileSystemTest, RateLimiterPutMultipartTest) {
ASSERT_NE(s3_fs_, nullptr);
// Save original config value
bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
// Enable S3 rate limiter for this test
config::enable_s3_rate_limiter = true;
// Set rate limit for PUT operations with higher limit for multipart
// Each chunk in multipart upload counts as one request
// limit: 3 (allow only 3 PUT requests, multipart with 6 chunks should fail)
int ret = reset_s3_rate_limiter(S3RateLimitType::PUT, 10, 10, 3);
ASSERT_EQ(ret, 0) << "Failed to set rate limiter for PUT operations";
std::cout << "Rate limiter set: limit 3 total requests for PUT operations (multipart)"
<< std::endl;
std::string test_file = global_test_prefix_ + "rate_limit_multipart_test.dat";
// Create file writer
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(test_file, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file: " << status.to_string();
// Write large data to trigger multipart upload (>5MB for multipart)
// This will trigger multiple PUT requests and should fail
size_t chunk_size = 1024 * 1024; // 1MB
size_t num_chunks = 6; // 6MB total, will trigger multipart
std::string chunk_data(chunk_size, 'A');
bool failed = false;
for (size_t i = 0; i < num_chunks; ++i) {
std::ranges::fill(chunk_data, 'A' + (i % 26));
status = writer->append(chunk_data);
if (!status.ok()) {
failed = true;
std::cout << "Multipart upload failed at chunk " << i
<< " as expected: " << status.to_string() << std::endl;
break;
}
}
if (!failed) {
status = writer->close();
if (!status.ok()) {
failed = true;
std::cout << "Multipart upload failed at close as expected: " << status.to_string()
<< std::endl;
}
}
EXPECT_TRUE(failed) << "Multipart upload should fail due to rate limit";
// Reset rate limiter
reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
// Restore original config
config::enable_s3_rate_limiter = original_enable_rate_limiter;
std::cout << "Rate limiter PUT multipart test completed successfully!" << std::endl;
// Try to cleanup (may fail if file was not created)
std::ignore = s3_fs_->delete_file(test_file);
}
// Test: S3 rate limiter for PUT operations - upload
TEST_F(S3FileSystemTest, RateLimiterPutUploadTest) {
ASSERT_NE(s3_fs_, nullptr);
// Save original config value
bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
// Enable S3 rate limiter for this test
config::enable_s3_rate_limiter = true;
// Create local test files
std::vector<std::string> local_files;
std::vector<std::string> remote_files;
for (int i = 0; i < 3; ++i) {
std::string local_file = "/tmp/rate_limit_upload_" + std::to_string(i) + ".txt";
local_files.push_back(local_file);
std::ofstream ofs(local_file);
ASSERT_TRUE(ofs.is_open()) << "Failed to create local file";
ofs << "Upload test data " << i;
ofs.close();
remote_files.push_back(global_test_prefix_ + "rate_limit_upload_" + std::to_string(i) +
".txt");
}
// Set rate limit for PUT operations
// limit: 2 (allow only 2 requests total, the 3rd will fail)
int ret = reset_s3_rate_limiter(S3RateLimitType::PUT, 10, 10, 2);
ASSERT_EQ(ret, 0) << "Failed to set rate limiter for PUT operations";
std::cout << "Rate limiter set: limit 2 total requests for PUT operations (upload)"
<< std::endl;
// First two upload operations should succeed
Status status;
for (int i = 0; i < 2; ++i) {
status = s3_fs_->upload(local_files[i], remote_files[i]);
ASSERT_TRUE(status.ok()) << "Failed to upload file on attempt " << i + 1 << ": "
<< status.to_string();
std::cout << "Upload attempt " << i + 1 << " succeeded" << std::endl;
}
// Third upload operation should fail due to rate limit
status = s3_fs_->upload(local_files[2], remote_files[2]);
EXPECT_FALSE(status.ok()) << "Third upload should fail due to rate limit";
std::cout << "Third upload failed as expected: " << status.to_string() << std::endl;
// Reset rate limiter
reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
// Restore original config
config::enable_s3_rate_limiter = original_enable_rate_limiter;
std::cout << "Rate limiter PUT upload test completed successfully!" << std::endl;
// Cleanup local files
for (const auto& file : local_files) {
std::remove(file.c_str());
}
// Cleanup remote files (only first 2 should exist)
for (int i = 0; i < 2; ++i) {
status = s3_fs_->delete_file(remote_files[i]);
ASSERT_TRUE(status.ok()) << "Failed to delete remote file: " << status.to_string();
}
}
// Test: S3 rate limiter for GET operations - head_object (file_size/exists)
TEST_F(S3FileSystemTest, RateLimiterGetHeadObjectTest) {
ASSERT_NE(s3_fs_, nullptr);
// Save original config value
bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
// Enable S3 rate limiter for this test
config::enable_s3_rate_limiter = true;
// Create test files first
std::vector<std::string> test_files;
for (int i = 0; i < 3; ++i) {
std::string test_file =
global_test_prefix_ + "rate_limit_head_test_" + std::to_string(i) + ".txt";
test_files.push_back(test_file);
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(test_file, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file: " << status.to_string();
std::string test_data = "Head object test data " + std::to_string(i);
status = writer->append(test_data);
ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
status = writer->close();
ASSERT_TRUE(status.ok()) << "Failed to close writer: " << status.to_string();
}
// Set rate limit for GET operations (head_object uses GET rate limiter)
// limit: 2 (allow only 2 requests total, the 3rd will fail)
int ret = reset_s3_rate_limiter(S3RateLimitType::GET, 10, 10, 2);
ASSERT_EQ(ret, 0) << "Failed to set rate limiter for GET operations";
std::cout << "Rate limiter set: limit 2 total requests for GET operations (head_object)"
<< std::endl;
// First two head operations should succeed
Status status;
for (int i = 0; i < 2; ++i) {
int64_t file_size = 0;
status = s3_fs_->file_size(test_files[i], &file_size);
ASSERT_TRUE(status.ok()) << "Failed to get file size on attempt " << i + 1 << ": "
<< status.to_string();
std::cout << "Head object attempt " << i + 1 << " succeeded, size: " << file_size
<< std::endl;
}
// Third head operation should fail due to rate limit
bool exists = false;
status = s3_fs_->exists(test_files[2], &exists);
EXPECT_FALSE(status.ok()) << "Third head object should fail due to rate limit";
std::cout << "Third head object failed as expected: " << status.to_string() << std::endl;
// Reset rate limiter
reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
// Restore original config
config::enable_s3_rate_limiter = original_enable_rate_limiter;
std::cout << "Rate limiter GET head object test completed successfully!" << std::endl;
// Cleanup
for (const auto& file : test_files) {
status = s3_fs_->delete_file(file);
ASSERT_TRUE(status.ok()) << "Failed to delete test file: " << status.to_string();
}
}
// Test: S3 rate limiter for GET operations - list objects
TEST_F(S3FileSystemTest, RateLimiterGetListTest) {
ASSERT_NE(s3_fs_, nullptr);
// Save original config value
bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
// Enable S3 rate limiter for this test
config::enable_s3_rate_limiter = true;
// Create test directories with files
std::vector<std::string> test_dirs;
for (int i = 0; i < 3; ++i) {
std::string test_dir =
global_test_prefix_ + "rate_limit_list_test_" + std::to_string(i) + "/";
test_dirs.push_back(test_dir);
// Create a file in each directory
std::string test_file = test_dir + "file.txt";
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(test_file, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file: " << status.to_string();
std::string test_data = "List test data " + std::to_string(i);
status = writer->append(test_data);
ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
status = writer->close();
ASSERT_TRUE(status.ok()) << "Failed to close writer: " << status.to_string();
}
// Set rate limit for GET operations (list uses GET rate limiter)
// limit: 2 (allow only 2 requests total, the 3rd will fail)
int ret = reset_s3_rate_limiter(S3RateLimitType::GET, 10, 10, 2);
ASSERT_EQ(ret, 0) << "Failed to set rate limiter for GET operations";
std::cout << "Rate limiter set: limit 2 total requests for GET operations (list)" << std::endl;
// First two list operations should succeed
Status status;
for (int i = 0; i < 2; ++i) {
bool exists = false;
std::vector<io::FileInfo> files;
status = s3_fs_->list(test_dirs[i], true, &files, &exists);
ASSERT_TRUE(status.ok()) << "Failed to list on attempt " << i + 1 << ": "
<< status.to_string();
std::cout << "List attempt " << i + 1 << " succeeded, found " << files.size() << " files"
<< std::endl;
}
// Third list operation should fail due to rate limit
bool exists = false;
std::vector<io::FileInfo> files;
status = s3_fs_->list(test_dirs[2], true, &files, &exists);
EXPECT_FALSE(status.ok()) << "Third list should fail due to rate limit";
std::cout << "Third list failed as expected: " << status.to_string() << std::endl;
// Reset rate limiter
reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
// Restore original config
config::enable_s3_rate_limiter = original_enable_rate_limiter;
std::cout << "Rate limiter GET list test completed successfully!" << std::endl;
// Cleanup
for (const auto& dir : test_dirs) {
status = s3_fs_->delete_directory(dir);
ASSERT_TRUE(status.ok()) << "Failed to delete test directory: " << status.to_string();
}
}
// Test: S3 rate limiter for PUT operations - delete_file (uses PUT rate limiter)
TEST_F(S3FileSystemTest, RateLimiterPutDeleteTest) {
ASSERT_NE(s3_fs_, nullptr);
// Save original config value
bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
// Enable S3 rate limiter for this test
config::enable_s3_rate_limiter = true;
// Create test files first (without rate limit)
std::vector<std::string> test_files;
for (int i = 0; i < 3; ++i) {
std::string test_file =
global_test_prefix_ + "rate_limit_delete_test_" + std::to_string(i) + ".txt";
test_files.push_back(test_file);
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(test_file, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file: " << status.to_string();
std::string test_data = "Delete test data " + std::to_string(i);
status = writer->append(test_data);
ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
status = writer->close();
ASSERT_TRUE(status.ok()) << "Failed to close writer: " << status.to_string();
}
// Set rate limit for PUT operations (delete uses PUT rate limiter)
// limit: 2 (allow only 2 requests total, the 3rd will fail)
int ret = reset_s3_rate_limiter(S3RateLimitType::PUT, 10, 10, 2);
ASSERT_EQ(ret, 0) << "Failed to set rate limiter for PUT operations";
std::cout << "Rate limiter set: limit 2 total requests for PUT operations (delete)"
<< std::endl;
// First two delete operations should succeed
Status status;
for (int i = 0; i < 2; ++i) {
status = s3_fs_->delete_file(test_files[i]);
ASSERT_TRUE(status.ok()) << "Failed to delete file on attempt " << i + 1 << ": "
<< status.to_string();
std::cout << "Delete attempt " << i + 1 << " succeeded" << std::endl;
}
// Third delete operation should fail due to rate limit
status = s3_fs_->delete_file(test_files[2]);
EXPECT_FALSE(status.ok()) << "Third delete should fail due to rate limit";
std::cout << "Third delete failed as expected: " << status.to_string() << std::endl;
// Reset rate limiter to clean up the remaining file
reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
// Restore original config
config::enable_s3_rate_limiter = original_enable_rate_limiter;
std::cout << "Rate limiter PUT delete test completed successfully!" << std::endl;
// Cleanup the file that failed to delete
status = s3_fs_->delete_file(test_files[2]);
ASSERT_TRUE(status.ok()) << "Failed to cleanup remaining file: " << status.to_string();
}
// Test: S3 rate limiter for PUT operations - batch_delete (uses PUT rate limiter)
TEST_F(S3FileSystemTest, RateLimiterPutBatchDeleteTest) {
ASSERT_NE(s3_fs_, nullptr);
// Save original config value
bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
// Enable S3 rate limiter for this test
config::enable_s3_rate_limiter = true;
// Create multiple batches of test files
std::vector<std::vector<std::string>> file_batches;
for (int batch = 0; batch < 3; ++batch) {
std::vector<std::string> batch_files;
for (int i = 0; i < 3; ++i) {
std::string test_file = global_test_prefix_ + "rate_limit_batch_delete_batch_" +
std::to_string(batch) + "_file_" + std::to_string(i) + ".txt";
batch_files.push_back(test_file);
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(test_file, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file: " << status.to_string();
std::string test_data = "Batch delete test data";
status = writer->append(test_data);
ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
status = writer->close();
ASSERT_TRUE(status.ok()) << "Failed to close writer: " << status.to_string();
}
file_batches.push_back(batch_files);
}
// Set rate limit for PUT operations (batch_delete uses PUT rate limiter)
// limit: 2 (allow only 2 batch delete requests, the 3rd will fail)
int ret = reset_s3_rate_limiter(S3RateLimitType::PUT, 10, 10, 2);
ASSERT_EQ(ret, 0) << "Failed to set rate limiter for PUT operations";
std::cout << "Rate limiter set: limit 2 total requests for PUT operations (batch_delete)"
<< std::endl;
// First two batch delete operations should succeed
Status status;
for (int batch = 0; batch < 2; ++batch) {
std::vector<io::Path> paths_to_delete;
for (const auto& file : file_batches[batch]) {
paths_to_delete.emplace_back(file);
}
status = s3_fs_->batch_delete(paths_to_delete);
ASSERT_TRUE(status.ok()) << "Failed to batch delete on attempt " << batch + 1 << ": "
<< status.to_string();
std::cout << "Batch delete attempt " << batch + 1 << " succeeded, deleted "
<< paths_to_delete.size() << " files" << std::endl;
}
// Third batch delete operation should fail due to rate limit
std::vector<io::Path> paths_to_delete;
for (const auto& file : file_batches[2]) {
paths_to_delete.emplace_back(file);
}
status = s3_fs_->batch_delete(paths_to_delete);
EXPECT_FALSE(status.ok()) << "Third batch delete should fail due to rate limit";
std::cout << "Third batch delete failed as expected: " << status.to_string() << std::endl;
// Reset rate limiter to clean up remaining files
reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
// Restore original config
config::enable_s3_rate_limiter = original_enable_rate_limiter;
std::cout << "Rate limiter PUT batch delete test completed successfully!" << std::endl;
// Cleanup remaining files from failed batch
status = s3_fs_->batch_delete(paths_to_delete);
ASSERT_TRUE(status.ok()) << "Failed to cleanup remaining files: " << status.to_string();
}
// Test: S3 rate limiter for GET operations - delete_directory with ListObjectsV2
TEST_F(S3FileSystemTest, RateLimiterGetDeleteDirectoryListTest) {
ASSERT_NE(s3_fs_, nullptr);
// Save original config value
bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
// Enable S3 rate limiter for this test
config::enable_s3_rate_limiter = true;
// Create multiple test directories with files
std::vector<std::string> test_dirs;
for (int i = 0; i < 3; ++i) {
std::string test_dir =
global_test_prefix_ + "rate_limit_delete_dir_list_" + std::to_string(i) + "/";
test_dirs.push_back(test_dir);
// Create multiple files in each directory
for (int j = 0; j < 5; ++j) {
std::string test_file = test_dir + "file_" + std::to_string(j) + ".txt";
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(test_file, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file: " << status.to_string();
std::string test_data = "Delete directory test data";
status = writer->append(test_data);
ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
status = writer->close();
ASSERT_TRUE(status.ok()) << "Failed to close writer: " << status.to_string();
}
}
// Set rate limit for GET operations (ListObjectsV2 uses GET rate limiter)
// limit: 2 (allow only 2 ListObjectsV2 requests, the 3rd will fail)
int ret = reset_s3_rate_limiter(S3RateLimitType::GET, 10, 10, 2);
ASSERT_EQ(ret, 0) << "Failed to set rate limiter for GET operations";
std::cout << "Rate limiter set: limit 2 total requests for GET operations (ListObjectsV2 in "
"delete_directory)"
<< std::endl;
// First two delete_directory operations should succeed
// Each delete_directory calls ListObjectsV2 (GET) and DeleteObjects (PUT)
Status status;
for (int i = 0; i < 2; ++i) {
status = s3_fs_->delete_directory(test_dirs[i]);
ASSERT_TRUE(status.ok()) << "Failed to delete directory on attempt " << i + 1 << ": "
<< status.to_string();
std::cout << "Delete directory attempt " << i + 1 << " succeeded" << std::endl;
}
// Third delete_directory operation should fail due to rate limit on ListObjectsV2
status = s3_fs_->delete_directory(test_dirs[2]);
EXPECT_FALSE(status.ok())
<< "Third delete_directory should fail due to rate limit on ListObjectsV2";
std::cout << "Third delete_directory failed as expected (ListObjectsV2 rate limit): "
<< status.to_string() << std::endl;
// Reset rate limiter to clean up remaining directory
reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
// Restore original config
config::enable_s3_rate_limiter = original_enable_rate_limiter;
std::cout << "Rate limiter GET delete_directory (ListObjectsV2) test completed successfully!"
<< std::endl;
// Cleanup remaining directory
status = s3_fs_->delete_directory(test_dirs[2]);
ASSERT_TRUE(status.ok()) << "Failed to cleanup remaining directory: " << status.to_string();
}
// Test: S3 rate limiter for PUT operations - multipart upload with UploadPart failure
TEST_F(S3FileSystemTest, RateLimiterPutMultipartUploadPartFailureTest) {
ASSERT_NE(s3_fs_, nullptr);
// Save original config value
bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
// Enable S3 rate limiter for this test
config::enable_s3_rate_limiter = true;
// Set a very strict rate limit for PUT operations
// limit: 1 (allow only 1 PUT request: CreateMultipartUpload will succeed, but UploadPart will fail)
int ret = reset_s3_rate_limiter(S3RateLimitType::PUT, 10, 10, 1);
ASSERT_EQ(ret, 0) << "Failed to set rate limiter for PUT operations";
std::cout << "Rate limiter set: limit 1 total request for PUT operations (UploadPart will fail)"
<< std::endl;
std::string test_file = global_test_prefix_ + "rate_limit_upload_part_test.dat";
// Create file writer
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(test_file, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file: " << status.to_string();
// Write large data to trigger multipart upload (>5MB for multipart)
// This will trigger CreateMultipartUpload (1st PUT, succeeds) and UploadPart (2nd PUT, should fail)
size_t chunk_size = 1024 * 1024; // 1MB
size_t num_chunks = 6; // 6MB total, will trigger multipart
std::string chunk_data(chunk_size, 'A');
std::cout << "Writing " << (chunk_size * num_chunks) << " bytes to trigger multipart upload"
<< std::endl;
bool write_failed = false;
for (size_t i = 0; i < num_chunks; ++i) {
std::ranges::fill(chunk_data, 'A' + (i % 26));
status = writer->append(chunk_data);
if (!status.ok()) {
write_failed = true;
std::cout << "Write failed at chunk " << i << " as expected: " << status.to_string()
<< std::endl;
break;
}
}
if (!write_failed) {
// If write succeeded, close should fail
status = writer->close();
if (!status.ok()) {
write_failed = true;
std::cout << "Close failed as expected (UploadPart rate limit): " << status.to_string()
<< std::endl;
}
}
EXPECT_TRUE(write_failed) << "Multipart upload should fail due to UploadPart rate limit";
// Verify the error message contains information about UploadPart failure
if (write_failed) {
std::string error_msg = status.to_string();
// The error should be about upload failure
std::cout << "Verified UploadPart failure with error: " << error_msg << std::endl;
}
// Reset rate limiter
reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
// Restore original config
config::enable_s3_rate_limiter = original_enable_rate_limiter;
std::cout << "Rate limiter PUT multipart UploadPart failure test completed successfully!"
<< std::endl;
// Try to cleanup (may fail if file was not created)
std::ignore = s3_fs_->delete_file(test_file);
}
// Test: S3 DeleteObjects with partial failure - some objects fail to delete
TEST_F(S3FileSystemTest, DeleteDirectoryPartialFailureTest) {
ASSERT_NE(s3_fs_, nullptr);
std::string test_dir = global_test_prefix_ + "delete_partial_failure_test/";
std::cout << "Test directory path: " << test_dir << std::endl;
// Create multiple test files
std::vector<std::string> test_files;
for (int i = 0; i < 5; ++i) {
std::string test_file = test_dir + "file_" + std::to_string(i) + ".txt";
test_files.push_back(test_file);
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(test_file, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file: " << status.to_string();
std::string test_data = "Test data " + std::to_string(i);
status = writer->append(test_data);
ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
status = writer->close();
ASSERT_TRUE(status.ok()) << "Failed to close writer: " << status.to_string();
}
// Set up sync point to simulate partial delete failure
// When DeleteObjects is called, we'll inject errors into the result
SyncPoint::get_instance()->set_call_back(
"s3_obj_storage_client::delete_objects_recursively", [](std::vector<std::any>&& args) {
// The callback receives delete_outcome as argument
// delete_outcome is of type: Aws::Utils::Outcome<DeleteObjectsResult, S3Error>*
using DeleteObjectsOutcome =
Aws::Utils::Outcome<Aws::S3::Model::DeleteObjectsResult, Aws::S3::S3Error>;
auto* delete_outcome = std::any_cast<DeleteObjectsOutcome*>(args[0]);
// Create a mock error for one of the objects
Aws::S3::Model::Error error;
error.SetKey("file_1.txt");
error.SetCode("AccessDenied");
error.SetMessage("Simulated partial delete failure");
// Get the mutable result and add error
// Note: We need to create a new result with errors
auto& result = const_cast<Aws::S3::Model::DeleteObjectsResult&>(
delete_outcome->GetResult());
Aws::Vector<Aws::S3::Model::Error> errors;
errors.push_back(error);
result.SetErrors(errors);
});
// Enable sync point processing
SyncPoint::get_instance()->enable_processing();
// Try to delete directory - should fail due to partial delete failure
Status status = s3_fs_->delete_directory(test_dir);
// The delete should fail because some objects failed to delete
EXPECT_FALSE(status.ok()) << "Delete directory should fail due to partial delete failure";
std::cout << "Delete directory failed as expected with partial failure: " << status.to_string()
<< std::endl;
// Verify error message contains information about the failed object
std::string error_msg = status.to_string();
std::cout << "Error message: " << error_msg << std::endl;
// Disable sync point processing and clear callbacks
SyncPoint::get_instance()->disable_processing();
SyncPoint::get_instance()->clear_all_call_backs();
std::cout << "Delete directory partial failure test completed successfully!" << std::endl;
// Cleanup - now without sync point, it should succeed
status = s3_fs_->delete_directory(test_dir);
ASSERT_TRUE(status.ok()) << "Failed to cleanup test directory: " << status.to_string();
}
// Test: S3 batch_delete (delete_objects) with partial failure - some objects fail to delete
TEST_F(S3FileSystemTest, BatchDeletePartialFailureTest) {
ASSERT_NE(s3_fs_, nullptr);
std::string test_dir = global_test_prefix_ + "batch_delete_partial_failure_test/";
std::cout << "Test directory path: " << test_dir << std::endl;
// Create multiple test files
std::vector<std::string> test_files;
for (int i = 0; i < 5; ++i) {
std::string test_file = test_dir + "file_" + std::to_string(i) + ".txt";
test_files.push_back(test_file);
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(test_file, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file: " << status.to_string();
std::string test_data = "Batch delete test data " + std::to_string(i);
status = writer->append(test_data);
ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
status = writer->close();
ASSERT_TRUE(status.ok()) << "Failed to close writer: " << status.to_string();
}
// Verify all files exist
for (const auto& file_path : test_files) {
bool exists = false;
Status status = s3_fs_->exists(file_path, &exists);
ASSERT_TRUE(status.ok()) << "Failed to check file existence: " << status.to_string();
EXPECT_TRUE(exists) << "File should exist: " << file_path;
}
// Set up sync point to simulate partial delete failure in batch_delete
// When DeleteObjects is called via batch_delete, we'll inject errors into the result
SyncPoint::get_instance()->set_call_back(
"s3_obj_storage_client::delete_objects", [](std::vector<std::any>&& args) {
// The callback receives delete_outcome as argument
// delete_outcome is of type: Aws::Utils::Outcome<DeleteObjectsResult, S3Error>*
using DeleteObjectsOutcome =
Aws::Utils::Outcome<Aws::S3::Model::DeleteObjectsResult, Aws::S3::S3Error>;
auto* delete_outcome = std::any_cast<DeleteObjectsOutcome*>(args[0]);
// Create mock errors for some of the objects
Aws::S3::Model::Error error1;
error1.SetKey("file_2.txt");
error1.SetCode("AccessDenied");
error1.SetMessage("Simulated batch delete partial failure for file_2");
Aws::S3::Model::Error error2;
error2.SetKey("file_4.txt");
error2.SetCode("InternalError");
error2.SetMessage("Simulated batch delete partial failure for file_4");
// Get the mutable result and add errors
auto& result = const_cast<Aws::S3::Model::DeleteObjectsResult&>(
delete_outcome->GetResult());
Aws::Vector<Aws::S3::Model::Error> errors;
errors.push_back(error1);
errors.push_back(error2);
result.SetErrors(errors);
});
// Enable sync point processing
SyncPoint::get_instance()->enable_processing();
// Try to batch delete files - should fail due to partial delete failure
std::vector<io::Path> paths_to_delete;
for (const auto& file_path : test_files) {
paths_to_delete.emplace_back(file_path);
}
Status status = s3_fs_->batch_delete(paths_to_delete);
// The batch_delete should fail because some objects failed to delete
EXPECT_FALSE(status.ok()) << "Batch delete should fail due to partial delete failure";
std::cout << "Batch delete failed as expected with partial failure: " << status.to_string()
<< std::endl;
// Verify error message contains information about the failed object
std::string error_msg = status.to_string();
std::cout << "Error message: " << error_msg << std::endl;
// The error should mention one of the failed files
EXPECT_TRUE(error_msg.find("file_2") != std::string::npos ||
error_msg.find("file_4") != std::string::npos)
<< "Error message should mention failed file";
// Disable sync point processing and clear callbacks
SyncPoint::get_instance()->disable_processing();
SyncPoint::get_instance()->clear_all_call_backs();
std::cout << "Batch delete partial failure test completed successfully!" << std::endl;
// Cleanup - now without sync point, it should succeed
status = s3_fs_->batch_delete(paths_to_delete);
ASSERT_TRUE(status.ok()) << "Failed to cleanup test files: " << status.to_string();
}
// Test: S3 get_object with incomplete read - bytes_read != size_return
TEST_F(S3FileSystemTest, GetObjectIncompleteReadTest) {
ASSERT_NE(s3_fs_, nullptr);
std::string test_file = global_test_prefix_ + "incomplete_read_test.txt";
std::cout << "Test file path: " << test_file << std::endl;
// Create a test file with known content
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(test_file, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file: " << status.to_string();
std::string test_data =
"This is test data for incomplete read simulation. "
"The content should be long enough to test partial reads.";
status = writer->append(test_data);
ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
status = writer->close();
ASSERT_TRUE(status.ok()) << "Failed to close writer: " << status.to_string();
std::cout << "Created test file with " << test_data.size() << " bytes" << std::endl;
// Open file for reading
io::FileReaderSPtr reader;
status = s3_fs_->open_file(test_file, &reader);
ASSERT_TRUE(status.ok()) << "Failed to open file: " << status.to_string();
ASSERT_NE(reader, nullptr);
// Set up sync point to simulate incomplete read
// When get_object is called, we'll modify size_return to be less than bytes_read
SyncPoint::get_instance()->set_call_back(
"s3_obj_storage_client::get_object", [](std::vector<std::any>&& args) {
// The callback receives size_return as argument
auto* size_return = std::any_cast<size_t*>(args[0]);
// Simulate incomplete read by reducing the returned size
// For example, if we requested 50 bytes but only got 30
size_t original_size = *size_return;
*size_return = original_size / 2; // Return only half of what was requested
std::cout << "SyncPoint: Modified size_return from " << original_size << " to "
<< *size_return << std::endl;
});
// Enable sync point processing
SyncPoint::get_instance()->enable_processing();
// Try to read from file - should fail due to incomplete read
size_t read_size = 50;
std::vector<char> buffer(read_size);
size_t bytes_read = 0;
status = reader->read_at(0, Slice(buffer.data(), read_size), &bytes_read);
// The read should fail because size_return != bytes_read
EXPECT_FALSE(status.ok()) << "Read should fail due to incomplete read";
std::cout << "Read failed as expected with incomplete read: " << status.to_string()
<< std::endl;
// Verify error message contains information about the mismatch
std::string error_msg = status.to_string();
std::cout << "Error message: " << error_msg << std::endl;
EXPECT_TRUE(error_msg.find("bytes read") != std::string::npos ||
error_msg.find("bytes req") != std::string::npos)
<< "Error message should mention byte count mismatch";
// Disable sync point processing and clear callbacks
SyncPoint::get_instance()->disable_processing();
SyncPoint::get_instance()->clear_all_call_backs();
std::cout << "Get object incomplete read test completed successfully!" << std::endl;
// Cleanup
status = s3_fs_->delete_file(test_file);
ASSERT_TRUE(status.ok()) << "Failed to cleanup test file: " << status.to_string();
}
// Test: S3 rate limiter for PUT operations - delete_directory with DeleteObjects
TEST_F(S3FileSystemTest, RateLimiterPutDeleteDirectoryDeleteObjectsTest) {
ASSERT_NE(s3_fs_, nullptr);
// Save original config value
bool original_enable_rate_limiter = config::enable_s3_rate_limiter;
// Enable S3 rate limiter for this test
config::enable_s3_rate_limiter = true;
// Create multiple test directories with files
std::vector<std::string> test_dirs;
for (int i = 0; i < 3; ++i) {
std::string test_dir =
global_test_prefix_ + "rate_limit_delete_dir_objects_" + std::to_string(i) + "/";
test_dirs.push_back(test_dir);
// Create multiple files in each directory
for (int j = 0; j < 5; ++j) {
std::string test_file = test_dir + "file_" + std::to_string(j) + ".txt";
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(test_file, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file: " << status.to_string();
std::string test_data = "Delete directory test data";
status = writer->append(test_data);
ASSERT_TRUE(status.ok()) << "Failed to write data: " << status.to_string();
status = writer->close();
ASSERT_TRUE(status.ok()) << "Failed to close writer: " << status.to_string();
}
}
// Set rate limit for PUT operations (DeleteObjects uses PUT rate limiter)
// limit: 2 (allow only 2 DeleteObjects requests, the 3rd will fail)
// Note: We need to account for the file creation PUT requests above
// So we set limit to allow those + 2 DeleteObjects calls
int ret = reset_s3_rate_limiter(S3RateLimitType::PUT, 10, 10, 2);
ASSERT_EQ(ret, 0) << "Failed to set rate limiter for PUT operations";
std::cout << "Rate limiter set: limit 2 total requests for PUT operations (DeleteObjects in "
"delete_directory)"
<< std::endl;
// First two delete_directory operations should succeed
// Each delete_directory calls ListObjectsV2 (GET) and DeleteObjects (PUT)
Status status;
for (int i = 0; i < 2; ++i) {
status = s3_fs_->delete_directory(test_dirs[i]);
ASSERT_TRUE(status.ok()) << "Failed to delete directory on attempt " << i + 1 << ": "
<< status.to_string();
std::cout << "Delete directory attempt " << i + 1 << " succeeded" << std::endl;
}
// Third delete_directory operation should fail due to rate limit on DeleteObjects
status = s3_fs_->delete_directory(test_dirs[2]);
EXPECT_FALSE(status.ok())
<< "Third delete_directory should fail due to rate limit on DeleteObjects";
std::cout << "Third delete_directory failed as expected (DeleteObjects rate limit): "
<< status.to_string() << std::endl;
// Reset rate limiter to clean up remaining directory
reset_s3_rate_limiter(S3RateLimitType::PUT, 10000, 10000, 0);
reset_s3_rate_limiter(S3RateLimitType::GET, 10000, 10000, 0);
// Restore original config
config::enable_s3_rate_limiter = original_enable_rate_limiter;
std::cout << "Rate limiter PUT delete_directory (DeleteObjects) test completed successfully!"
<< std::endl;
// Cleanup remaining directory
status = s3_fs_->delete_directory(test_dirs[2]);
ASSERT_TRUE(status.ok()) << "Failed to cleanup remaining directory: " << status.to_string();
}
// Test: S3 CreateMultipartUpload failure - simulates error when initiating multipart upload
TEST_F(S3FileSystemTest, CreateMultipartUploadFailureTest) {
ASSERT_NE(s3_fs_, nullptr);
std::string test_file = global_test_prefix_ + "create_multipart_upload_failure_test.dat";
std::cout << "Test file path: " << test_file << std::endl;
// Set up sync point to simulate CreateMultipartUpload failure
SyncPoint::get_instance()->set_call_back("s3_file_writer::_open", [](std::vector<std::any>&&
args) {
// The callback receives create_multipart_upload_outcome as argument
using CreateMultipartUploadOutcome =
Aws::Utils::Outcome<Aws::S3::Model::CreateMultipartUploadResult, Aws::S3::S3Error>;
auto* outcome = std::any_cast<CreateMultipartUploadOutcome*>(args[0]);
// Create a mock S3 error to simulate CreateMultipartUpload failure
Aws::S3::S3Error error;
error.SetResponseCode(Aws::Http::HttpResponseCode::FORBIDDEN);
error.SetExceptionName("AccessDenied");
error.SetMessage("Simulated CreateMultipartUpload failure - Access Denied");
error.SetRequestId("test-request-id-12345");
// Replace the successful outcome with a failure outcome
*outcome = CreateMultipartUploadOutcome(error);
std::cout << "SyncPoint: Injected CreateMultipartUpload failure" << std::endl;
});
// Enable sync point processing
SyncPoint::get_instance()->enable_processing();
// Try to create a large file that will trigger multipart upload
io::FileWriterPtr writer;
Status status = s3_fs_->create_file(test_file, &writer);
ASSERT_TRUE(status.ok()) << "Failed to create file writer: " << status.to_string();
// Write large data to trigger multipart upload (>5MB for multipart)
size_t chunk_size = 1024 * 1024; // 1MB
size_t num_chunks = 6; // 6MB total, will trigger multipart
std::string chunk_data(chunk_size, 'A');
std::cout << "Writing " << (chunk_size * num_chunks)
<< " bytes to trigger CreateMultipartUpload" << std::endl;
bool write_failed = false;
for (size_t i = 0; i < num_chunks; ++i) {
std::ranges::fill(chunk_data, 'A' + (i % 26));
status = writer->append(chunk_data);
if (!status.ok()) {
write_failed = true;
std::cout << "Write failed at chunk " << i << " as expected: " << status.to_string()
<< std::endl;
break;
}
}
if (!write_failed) {
// If write succeeded, close should fail
status = writer->close();
if (!status.ok()) {
write_failed = true;
std::cout << "Close failed as expected (CreateMultipartUpload failure): "
<< status.to_string() << std::endl;
}
}
EXPECT_TRUE(write_failed)
<< "Multipart upload should fail due to CreateMultipartUpload failure";
// Verify the error message contains information about CreateMultipartUpload failure
if (write_failed) {
std::string error_msg = status.to_string();
std::cout << "Verified CreateMultipartUpload failure with error: " << error_msg
<< std::endl;
// Check for expected error indicators
bool has_expected_error = error_msg.find("CreateMultipartUpload") != std::string::npos ||
error_msg.find("AccessDenied") != std::string::npos ||
error_msg.find("Access Denied") != std::string::npos;
EXPECT_TRUE(has_expected_error)
<< "Error message should mention CreateMultipartUpload or AccessDenied failure";
}
// Disable sync point processing and clear callbacks
SyncPoint::get_instance()->disable_processing();
SyncPoint::get_instance()->clear_all_call_backs();
std::cout << "CreateMultipartUpload failure test completed successfully!" << std::endl;
// Try to cleanup (file should not exist since upload failed)
std::ignore = s3_fs_->delete_file(test_file);
}
} // namespace doris