blob: 8ff4b52a06136a46e3d61faa03844e479119cdf0 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "recycler/s3_accessor.h"
#include <aws/core/Aws.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/S3ServiceClientModel.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/GetBucketLifecycleConfigurationRequest.h>
#include <aws/s3/model/GetBucketVersioningRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/LifecycleRule.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <gtest/gtest.h>
#include <algorithm>
#include <chrono>
#include <cstddef>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <string>
#include <string_view>
#include <thread>
#include "common/configbase.h"
#include "common/logging.h"
#include "common/sync_point.h"
#include "mock_accessor.h"
std::unique_ptr<cloud::MockAccessor> _mock_fs;
class S3ClientInterface {
public:
S3ClientInterface() = default;
virtual ~S3ClientInterface() = default;
virtual Aws::S3::Model::ListObjectsV2Outcome ListObjectsV2(
const Aws::S3::Model::ListObjectsV2Request& req) = 0;
virtual Aws::S3::Model::DeleteObjectsOutcome DeleteObjects(
const Aws::S3::Model::DeleteObjectsRequest& req) = 0;
virtual Aws::S3::Model::PutObjectOutcome PutObject(
const Aws::S3::Model::PutObjectRequest& req) = 0;
virtual Aws::S3::Model::HeadObjectOutcome HeadObject(
const Aws::S3::Model::HeadObjectRequest& req) = 0;
virtual Aws::S3::Model::GetBucketLifecycleConfigurationOutcome GetBucketLifecycleConfiguration(
const Aws::S3::Model::GetBucketLifecycleConfigurationRequest& req) = 0;
virtual Aws::S3::Model::GetBucketVersioningOutcome GetBucketVersioning(
const Aws::S3::Model::GetBucketVersioningRequest& req) = 0;
};
static bool list_object_v2_with_expire_time = false;
static int64_t expire_time = 0;
static bool set_bucket_lifecycle = false;
static bool set_bucket_versioning_status_error = false;
class S3Client : public S3ClientInterface {
public:
S3Client() = default;
~S3Client() override = default;
Aws::S3::Model::ListObjectsV2Outcome ListObjectsV2(
const Aws::S3::Model::ListObjectsV2Request& req) override {
auto prefix = req.GetPrefix();
auto continuation_token =
req.ContinuationTokenHasBeenSet() ? req.GetContinuationToken() : "";
bool truncated = true;
std::vector<cloud::ObjectMeta> files;
size_t num = 0;
do {
_mock_fs->list(prefix, &files);
if (num == files.size()) {
truncated = false;
break;
}
num = files.size();
auto path1 = files.back().path;
prefix = path1.back() += 1;
} while (files.size() <= 1000);
Aws::S3::Model::ListObjectsV2Result result;
result.SetIsTruncated(truncated);
std::vector<Aws::S3::Model::Object> objects;
std::for_each(files.begin(), files.end(), [&](const cloud::ObjectMeta& file) {
Aws::S3::Model::Object obj;
obj.SetKey(file.path);
Aws::Utils::DateTime date;
if (list_object_v2_with_expire_time) {
date = Aws::Utils::DateTime(expire_time);
}
obj.SetLastModified(date);
objects.emplace_back(std::move(obj));
});
result.SetContents(std::move(objects));
return Aws::S3::Model::ListObjectsV2Outcome(std::move(result));
}
Aws::S3::Model::DeleteObjectsOutcome DeleteObjects(
const Aws::S3::Model::DeleteObjectsRequest& req) override {
Aws::S3::Model::DeleteObjectsResult result;
const auto& deletes = req.GetDelete();
for (const auto& obj : deletes.GetObjects()) {
_mock_fs->delete_object(obj.GetKey());
}
return Aws::S3::Model::DeleteObjectsOutcome(std::move(result));
}
Aws::S3::Model::PutObjectOutcome PutObject(
const Aws::S3::Model::PutObjectRequest& req) override {
Aws::S3::Model::PutObjectResult result;
const auto& key = req.GetKey();
_mock_fs->put_object(key, "");
return Aws::S3::Model::PutObjectOutcome(std::move(result));
}
Aws::S3::Model::HeadObjectOutcome HeadObject(
const Aws::S3::Model::HeadObjectRequest& req) override {
Aws::S3::Model::HeadObjectResult result;
const auto& key = req.GetKey();
auto v = _mock_fs->exist(key);
if (v == 1) {
auto err = Aws::Client::AWSError<Aws::S3::S3Errors>(
Aws::S3::S3Errors::RESOURCE_NOT_FOUND, false);
err.SetResponseCode(Aws::Http::HttpResponseCode::NOT_FOUND);
return Aws::S3::Model::HeadObjectOutcome(std::move(err));
}
return Aws::S3::Model::HeadObjectOutcome(std::move(result));
}
Aws::S3::Model::GetBucketLifecycleConfigurationOutcome GetBucketLifecycleConfiguration(
const Aws::S3::Model::GetBucketLifecycleConfigurationRequest& req) override {
Aws::S3::Model::GetBucketLifecycleConfigurationResult result;
Aws::Vector<Aws::S3::Model::LifecycleRule> rules;
if (set_bucket_lifecycle) {
Aws::S3::Model::LifecycleRule rule;
Aws::S3::Model::NoncurrentVersionExpiration expiration;
expiration.SetNoncurrentDays(1000);
rule.SetNoncurrentVersionExpiration(expiration);
rules.emplace_back(std::move(rule));
}
result.SetRules(std::move(rules));
return Aws::S3::Model::GetBucketLifecycleConfigurationOutcome(std::move(result));
}
Aws::S3::Model::GetBucketVersioningOutcome GetBucketVersioning(
const Aws::S3::Model::GetBucketVersioningRequest& req) override {
Aws::S3::Model::GetBucketVersioningResult result;
if (set_bucket_versioning_status_error) {
result.SetStatus(Aws::S3::Model::BucketVersioningStatus::Suspended);
} else {
result.SetStatus(Aws::S3::Model::BucketVersioningStatus::Enabled);
}
return Aws::S3::Model::GetBucketVersioningOutcome(std::move(result));
}
};
static bool return_error_for_error_s3_client = false;
static bool delete_objects_return_part_error = false;
class ErrorS3Client : public S3ClientInterface {
public:
ErrorS3Client() : _correct_impl(std::make_unique<S3Client>()) {}
~ErrorS3Client() override = default;
Aws::S3::Model::ListObjectsV2Outcome ListObjectsV2(
const Aws::S3::Model::ListObjectsV2Request& req) override {
if (!return_error_for_error_s3_client) {
return _correct_impl->ListObjectsV2(req);
}
auto err = Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::RESOURCE_NOT_FOUND,
false);
err.SetResponseCode(Aws::Http::HttpResponseCode::NOT_FOUND);
return Aws::S3::Model::ListObjectsV2Outcome(std::move(err));
}
Aws::S3::Model::DeleteObjectsOutcome DeleteObjects(
const Aws::S3::Model::DeleteObjectsRequest& req) override {
if (!delete_objects_return_part_error) {
Aws::S3::Model::DeleteObjectsResult result;
Aws::Vector<Aws::S3::Model::Error> errors;
Aws::S3::Model::Error error;
errors.emplace_back(std::move(error));
result.SetErrors(std::move(errors));
return Aws::S3::Model::DeleteObjectsOutcome(std::move(result));
}
auto err = Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::RESOURCE_NOT_FOUND,
false);
err.SetResponseCode(Aws::Http::HttpResponseCode::NOT_FOUND);
// return -1
return Aws::S3::Model::DeleteObjectsOutcome(std::move(err));
}
Aws::S3::Model::PutObjectOutcome PutObject(
const Aws::S3::Model::PutObjectRequest& req) override {
if (!return_error_for_error_s3_client) {
return _correct_impl->PutObject(req);
}
auto err = Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::RESOURCE_NOT_FOUND,
false);
err.SetResponseCode(Aws::Http::HttpResponseCode::NOT_FOUND);
return Aws::S3::Model::PutObjectOutcome(std::move(err));
}
Aws::S3::Model::HeadObjectOutcome HeadObject(
const Aws::S3::Model::HeadObjectRequest& req) override {
if (!return_error_for_error_s3_client) {
return _correct_impl->HeadObject(req);
}
auto err = Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::RESOURCE_NOT_FOUND,
false);
err.SetResponseCode(Aws::Http::HttpResponseCode::INTERNAL_SERVER_ERROR);
return Aws::S3::Model::HeadObjectOutcome(std::move(err));
}
Aws::S3::Model::GetBucketLifecycleConfigurationOutcome GetBucketLifecycleConfiguration(
const Aws::S3::Model::GetBucketLifecycleConfigurationRequest& req) override {
if (!return_error_for_error_s3_client) {
return _correct_impl->GetBucketLifecycleConfiguration(req);
}
auto err = Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::RESOURCE_NOT_FOUND,
false);
err.SetResponseCode(Aws::Http::HttpResponseCode::INTERNAL_SERVER_ERROR);
return Aws::S3::Model::GetBucketLifecycleConfigurationOutcome(std::move(err));
}
Aws::S3::Model::GetBucketVersioningOutcome GetBucketVersioning(
const Aws::S3::Model::GetBucketVersioningRequest& req) override {
if (!return_error_for_error_s3_client) {
return _correct_impl->GetBucketVersioning(req);
}
auto err = Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::RESOURCE_NOT_FOUND,
false);
err.SetResponseCode(Aws::Http::HttpResponseCode::INTERNAL_SERVER_ERROR);
return Aws::S3::Model::GetBucketVersioningOutcome(std::move(err));
}
private:
std::unique_ptr<S3ClientInterface> _correct_impl;
};
class MockS3Client {
public:
MockS3Client(std::unique_ptr<S3ClientInterface> impl = std::make_unique<S3Client>())
: _impl(std::move(impl)) {}
auto ListObjectsV2(const Aws::S3::Model::ListObjectsV2Request& req) {
return _impl->ListObjectsV2(req);
}
auto DeleteObjects(const Aws::S3::Model::DeleteObjectsRequest& req) {
return _impl->DeleteObjects(req);
}
auto PutObject(const Aws::S3::Model::PutObjectRequest& req) { return _impl->PutObject(req); }
auto HeadObject(const Aws::S3::Model::HeadObjectRequest& req) { return _impl->HeadObject(req); }
auto GetBucketLifecycleConfiguration(
const Aws::S3::Model::GetBucketLifecycleConfigurationRequest& req) {
return _impl->GetBucketLifecycleConfiguration(req);
}
auto GetBucketVersioning(const Aws::S3::Model::GetBucketVersioningRequest& req) {
return _impl->GetBucketVersioning(req);
}
private:
std::unique_ptr<S3ClientInterface> _impl;
};
std::unique_ptr<MockS3Client> _mock_client;
struct MockCallable {
std::string point_name;
std::function<void(void*)> func;
};
static auto callbacks = std::array {
MockCallable {"s3_client::list_objects_v2",
[](void* p) {
auto pair = *(std::pair<Aws::S3::Model::ListObjectsV2Outcome*,
Aws::S3::Model::ListObjectsV2Request*>*)p;
*pair.first = (*_mock_client).ListObjectsV2(*pair.second);
}},
MockCallable {"s3_client::delete_objects",
[](void* p) {
auto pair = *(std::pair<Aws::S3::Model::DeleteObjectsOutcome*,
Aws::S3::Model::DeleteObjectsRequest*>*)p;
*pair.first = (*_mock_client).DeleteObjects(*pair.second);
}},
MockCallable {"s3_client::put_object",
[](void* p) {
auto pair = *(std::pair<Aws::S3::Model::PutObjectOutcome*,
Aws::S3::Model::PutObjectRequest*>*)p;
*pair.first = (*_mock_client).PutObject(*pair.second);
}},
MockCallable {"s3_client::head_object",
[](void* p) {
auto pair = *(std::pair<Aws::S3::Model::HeadObjectOutcome*,
Aws::S3::Model::HeadObjectRequest*>*)p;
*pair.first = (*_mock_client).HeadObject(*pair.second);
}},
MockCallable {
"s3_client::get_bucket_lifecycle_configuration",
[](void* p) {
auto pair =
*(std::pair<Aws::S3::Model::GetBucketLifecycleConfigurationOutcome*,
Aws::S3::Model::GetBucketLifecycleConfigurationRequest*>*)p;
*pair.first = (*_mock_client).GetBucketLifecycleConfiguration(*pair.second);
}},
MockCallable {"s3_client::get_bucket_versioning", [](void* p) {
auto pair = *(std::pair<Aws::S3::Model::GetBucketVersioningOutcome*,
Aws::S3::Model::GetBucketVersioningRequest*>*)p;
*pair.first = (*_mock_client).GetBucketVersioning(*pair.second);
}}};
int main(int argc, char** argv) {
const std::string conf_file = "doris_cloud.conf";
if (!cloud::config::init(conf_file.c_str(), true)) {
std::cerr << "failed to init config file, conf=" << conf_file << std::endl;
return -1;
}
if (!cloud::init_glog("s3_accessor_test")) {
std::cerr << "failed to init glog" << std::endl;
return -1;
}
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
namespace cloud {
std::string get_key(const std::string& relative_path) {
return fmt::format("/{}", relative_path);
}
void create_file_under_prefix(std::string_view prefix, size_t file_nums) {
for (size_t i = 0; i < file_nums; i++) {
_mock_fs->put_object(get_key(fmt::format("{}{}", prefix, i)), "");
}
}
TEST(S3AccessorTest, init) {
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
ASSERT_EQ(0, accessor->init());
}
TEST(S3AccessorTest, check_bucket_versioning) {
_mock_fs = std::make_unique<cloud::MockAccessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>();
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
[](void* p) { *((bool*)p) = true; });
sp->set_call_back(mock_callback.point_name, mock_callback.func);
});
sp->enable_processing();
std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
sp->disable_processing();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->clear_call_back(mock_callback.point_name);
});
set_bucket_versioning_status_error = false;
});
{ ASSERT_EQ(0, accessor->check_bucket_versioning()); }
{
set_bucket_versioning_status_error = true;
ASSERT_EQ(-1, accessor->check_bucket_versioning());
}
}
TEST(S3AccessorTest, check_bucket_versioning_error) {
_mock_fs = std::make_unique<cloud::MockAccessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
auto sp = SyncPoint::get_instance();
return_error_for_error_s3_client = true;
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
[](void* p) { *((bool*)p) = true; });
sp->set_call_back(mock_callback.point_name, mock_callback.func);
});
sp->enable_processing();
std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
sp->disable_processing();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->clear_call_back(mock_callback.point_name);
});
return_error_for_error_s3_client = false;
});
ASSERT_EQ(-1, accessor->check_bucket_versioning());
}
TEST(S3AccessorTest, get_bucket_lifecycle) {
_mock_fs = std::make_unique<cloud::MockAccessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>();
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
[](void* p) { *((bool*)p) = true; });
sp->set_call_back(mock_callback.point_name, mock_callback.func);
});
sp->enable_processing();
std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
sp->disable_processing();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->clear_call_back(mock_callback.point_name);
});
set_bucket_lifecycle = false;
});
{
int64_t expiration_time = 0;
ASSERT_EQ(-1, accessor->get_bucket_lifecycle(&expiration_time));
}
{
set_bucket_lifecycle = true;
int64_t expiration_time = 0;
ASSERT_EQ(0, accessor->get_bucket_lifecycle(&expiration_time));
}
}
TEST(S3AccessorTest, get_bucket_lifecycle_error) {
_mock_fs = std::make_unique<cloud::MockAccessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
auto sp = SyncPoint::get_instance();
return_error_for_error_s3_client = true;
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
[](void* p) { *((bool*)p) = true; });
sp->set_call_back(mock_callback.point_name, mock_callback.func);
});
sp->enable_processing();
std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
sp->disable_processing();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->clear_call_back(mock_callback.point_name);
});
return_error_for_error_s3_client = false;
});
int64_t expiration_time = 0;
ASSERT_EQ(-1, accessor->get_bucket_lifecycle(&expiration_time));
}
TEST(S3AccessorTest, list) {
_mock_fs = std::make_unique<cloud::MockAccessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>();
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
[](void* p) { *((bool*)p) = true; });
sp->set_call_back(mock_callback.point_name, mock_callback.func);
});
sp->enable_processing();
std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
sp->disable_processing();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->clear_call_back(mock_callback.point_name);
});
});
create_file_under_prefix("test_list", 300);
std::vector<ObjectMeta> files;
ASSERT_EQ(0, accessor->list("test_list", &files));
ASSERT_EQ(300, files.size());
}
TEST(S3AccessorTest, list_error) {
_mock_fs = std::make_unique<cloud::MockAccessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
auto sp = SyncPoint::get_instance();
return_error_for_error_s3_client = true;
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
[](void* p) { *((bool*)p) = true; });
sp->set_call_back(mock_callback.point_name, mock_callback.func);
});
sp->enable_processing();
std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
sp->disable_processing();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->clear_call_back(mock_callback.point_name);
});
return_error_for_error_s3_client = false;
});
create_file_under_prefix("test_list", 300);
std::vector<ObjectMeta> files;
ASSERT_EQ(-1, accessor->list("test_list", &files));
}
TEST(S3AccessorTest, put) {
_mock_fs = std::make_unique<cloud::MockAccessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>();
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
[](void* p) { *((bool*)p) = true; });
sp->set_call_back(mock_callback.point_name, mock_callback.func);
});
sp->enable_processing();
std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
sp->disable_processing();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->clear_call_back(mock_callback.point_name);
});
});
std::string prefix = "test_put";
for (size_t i = 0; i < 300; i++) {
ASSERT_EQ(0, accessor->put_object(fmt::format("{}{}", prefix, i), ""));
}
std::vector<ObjectMeta> files;
ASSERT_EQ(0, accessor->list("test_put", &files));
ASSERT_EQ(300, files.size());
}
TEST(S3AccessorTest, put_error) {
_mock_fs = std::make_unique<cloud::MockAccessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
[](void* p) { *((bool*)p) = true; });
sp->set_call_back(mock_callback.point_name, mock_callback.func);
});
sp->enable_processing();
std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
sp->disable_processing();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->clear_call_back(mock_callback.point_name);
});
return_error_for_error_s3_client = false;
});
std::string prefix = "test_put_error";
for (size_t i = 0; i < 300; i++) {
if (i % 2) {
return_error_for_error_s3_client = true;
ASSERT_EQ(-1, accessor->put_object(fmt::format("{}{}", prefix, i), ""));
return_error_for_error_s3_client = false;
break;
}
ASSERT_EQ(0, accessor->put_object(fmt::format("{}{}", prefix, i), ""));
}
std::vector<ObjectMeta> files;
ASSERT_EQ(0, accessor->list("test_put_error", &files));
}
TEST(S3AccessorTest, exist) {
_mock_fs = std::make_unique<cloud::MockAccessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>();
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
[](void* p) { *((bool*)p) = true; });
sp->set_call_back(mock_callback.point_name, mock_callback.func);
});
sp->enable_processing();
std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
sp->disable_processing();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->clear_call_back(mock_callback.point_name);
});
});
std::string prefix = "test_exist";
ASSERT_EQ(1, accessor->exist(prefix));
ASSERT_EQ(0, accessor->put_object(prefix, ""));
ASSERT_EQ(0, accessor->exist(prefix));
}
TEST(S3AccessorTest, exist_error) {
_mock_fs = std::make_unique<cloud::MockAccessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
[](void* p) { *((bool*)p) = true; });
sp->set_call_back(mock_callback.point_name, mock_callback.func);
});
sp->enable_processing();
std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
sp->disable_processing();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->clear_call_back(mock_callback.point_name);
});
return_error_for_error_s3_client = false;
});
std::string prefix = "test_exist_error";
ASSERT_EQ(1, accessor->exist(prefix));
ASSERT_EQ(0, accessor->put_object(prefix, ""));
return_error_for_error_s3_client = true;
ASSERT_EQ(-1, accessor->exist(prefix));
}
// function is not implemented
TEST(S3AccessorTest, DISABLED_delete_object) {
_mock_fs = std::make_unique<cloud::MockAccessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>();
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
[](void* p) { *((bool*)p) = true; });
sp->set_call_back(mock_callback.point_name, mock_callback.func);
});
sp->enable_processing();
std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
sp->disable_processing();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->clear_call_back(mock_callback.point_name);
});
});
std::string prefix = "test_delete_object";
create_file_under_prefix(prefix, 200);
for (size_t i = 0; i < 200; i++) {
auto path = fmt::format("{}{}", prefix, i);
ASSERT_EQ(0, accessor->delete_object(path));
ASSERT_EQ(1, accessor->exist(path));
}
}
TEST(S3AccessorTest, delete_objects) {
_mock_fs = std::make_unique<cloud::MockAccessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>();
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
[](void* p) { *((bool*)p) = true; });
sp->set_call_back(mock_callback.point_name, mock_callback.func);
});
sp->enable_processing();
std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
sp->disable_processing();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->clear_call_back(mock_callback.point_name);
});
});
std::string prefix = "test_delete_objects";
std::vector<std::string> paths;
size_t num = 300;
for (size_t i = 0; i < num; i++) {
auto path = fmt::format("{}{}", prefix, i);
_mock_fs->put_object(path, "");
paths.emplace_back(std::move(path));
}
ASSERT_EQ(0, accessor->delete_objects(paths));
for (size_t i = 0; i < num; i++) {
auto path = fmt::format("{}{}", prefix, i);
ASSERT_EQ(1, accessor->exist(path));
}
}
TEST(S3AccessorTest, delete_objects_error) {
_mock_fs = std::make_unique<cloud::MockAccessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
[](void* p) { *((bool*)p) = true; });
sp->set_call_back(mock_callback.point_name, mock_callback.func);
});
sp->enable_processing();
std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
sp->disable_processing();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->clear_call_back(mock_callback.point_name);
});
return_error_for_error_s3_client = false;
delete_objects_return_part_error = false;
});
std::string prefix = "test_delete_objects";
std::vector<std::string> paths_first_half;
std::vector<std::string> paths_second_half;
size_t num = 300;
for (size_t i = 0; i < num; i++) {
auto path = fmt::format("{}{}", prefix, i);
_mock_fs->put_object(path, "");
if (i < 150) {
paths_first_half.emplace_back(std::move(path));
} else {
paths_second_half.emplace_back(std::move(path));
}
}
std::vector<std::string> empty;
ASSERT_EQ(0, accessor->delete_objects(empty));
return_error_for_error_s3_client = true;
delete_objects_return_part_error = true;
ASSERT_EQ(-1, accessor->delete_objects(paths_first_half));
delete_objects_return_part_error = false;
ASSERT_EQ(-2, accessor->delete_objects(paths_second_half));
}
TEST(S3AccessorTest, delete_expired_objects) {
_mock_fs = std::make_unique<cloud::MockAccessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>();
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
[](void* p) { *((bool*)p) = true; });
sp->set_call_back(mock_callback.point_name, mock_callback.func);
});
sp->enable_processing();
std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
sp->disable_processing();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->clear_call_back(mock_callback.point_name);
});
});
{
std::string prefix = "atest_delete_expired_objects";
size_t num = 2000;
create_file_under_prefix(prefix, num);
list_object_v2_with_expire_time = true;
expire_time = 50;
ASSERT_EQ(0, accessor->delete_expired_objects(prefix, 100));
for (size_t i = 0; i < num; i++) {
auto path = fmt::format("{}{}", prefix, i);
ASSERT_EQ(1, accessor->exist(path));
}
}
{
std::string prefix = "btest_delete_expired_objects";
size_t num = 2000;
create_file_under_prefix(prefix, num);
list_object_v2_with_expire_time = true;
expire_time = 150;
ASSERT_EQ(0, accessor->delete_expired_objects(prefix, 100));
for (size_t i = 0; i < num; i++) {
auto path = fmt::format("{}{}", prefix, i);
ASSERT_EQ(1, accessor->exist(path));
}
}
{
std::string prefix = "ctest_delete_expired_objects";
size_t num = 2000;
create_file_under_prefix(prefix, num);
list_object_v2_with_expire_time = true;
expire_time = 150;
return_error_for_error_s3_client = true;
std::unique_ptr<int, std::function<void(int*)>> defer(
(int*)0x01, [&](int*) { return_error_for_error_s3_client = false; });
ASSERT_EQ(0, accessor->delete_expired_objects(prefix, 100));
}
}
TEST(S3AccessorTest, delete_object_by_prefix) {
_mock_fs = std::make_unique<cloud::MockAccessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>();
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
[](void* p) { *((bool*)p) = true; });
sp->set_call_back(mock_callback.point_name, mock_callback.func);
});
sp->enable_processing();
std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
sp->disable_processing();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->clear_call_back(mock_callback.point_name);
});
});
std::string prefix = "test_delete_objects_by_prefix";
size_t num = 2000;
create_file_under_prefix(prefix, num);
ASSERT_EQ(0, accessor->delete_objects_by_prefix(prefix));
for (size_t i = 0; i < num; i++) {
auto path = fmt::format("{}{}", prefix, i);
ASSERT_EQ(1, accessor->exist(path));
}
}
TEST(S3AccessorTest, delete_object_by_prefix_error) {
_mock_fs = std::make_unique<cloud::MockAccessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
[](void* p) { *((bool*)p) = true; });
sp->set_call_back(mock_callback.point_name, mock_callback.func);
});
sp->enable_processing();
std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
sp->disable_processing();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable& mock_callback) {
sp->clear_call_back(mock_callback.point_name);
});
return_error_for_error_s3_client = false;
delete_objects_return_part_error = false;
});
std::string prefix = "test_delete_objects_by_prefix";
size_t num = 2000;
create_file_under_prefix(prefix, num);
delete_objects_return_part_error = true;
return_error_for_error_s3_client = true;
ASSERT_EQ(-1, accessor->delete_objects_by_prefix(prefix));
return_error_for_error_s3_client = false;
ASSERT_EQ(-2, accessor->delete_objects_by_prefix(prefix));
delete_objects_return_part_error = false;
ASSERT_EQ(-3, accessor->delete_objects_by_prefix(prefix));
}
} // namespace cloud