blob: f70023d0a72bd57be783d266c978d74943a65180 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "recycler/s3_accessor.h"
#include <aws/core/Aws.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/GetBucketLifecycleConfigurationRequest.h>
#include <aws/s3/model/GetBucketVersioningRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/LifecycleRule.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <utility>
#include "common/logging.h"
#include "common/sync_point.h"
namespace doris::cloud {
#ifndef UNIT_TEST
#define HELP_MACRO(ret, req, point_name)
#else
#define HELP_MACRO(ret, req, point_name) \
do { \
std::pair p {&ret, &req}; \
[[maybe_unused]] auto ret_pair = [&p]() mutable { \
TEST_SYNC_POINT_RETURN_WITH_VALUE(point_name, &p); \
return p; \
}(); \
return ret; \
} while (false)
#endif
#define SYNC_POINT_HOOK_RETURN_VALUE(expr, point_name, req) \
[&]() mutable { \
[[maybe_unused]] decltype((expr)) t; \
HELP_MACRO(t, req, point_name); \
return (expr); \
}()
class S3Environment {
public:
S3Environment() { Aws::InitAPI(aws_options_); }
~S3Environment() { Aws::ShutdownAPI(aws_options_); }
private:
Aws::SDKOptions aws_options_;
};
S3Accessor::S3Accessor(S3Conf conf) : conf_(std::move(conf)) {
path_ = conf_.endpoint + '/' + conf_.bucket + '/' + conf_.prefix;
}
S3Accessor::~S3Accessor() = default;
std::string S3Accessor::get_key(const std::string& relative_path) const {
return conf_.prefix + '/' + relative_path;
}
std::string S3Accessor::get_relative_path(const std::string& key) const {
return key.find(conf_.prefix + "/") != 0 ? "" : key.substr(conf_.prefix.length() + 1);
}
int S3Accessor::init() {
static S3Environment s3_env;
Aws::Auth::AWSCredentials aws_cred(conf_.ak, conf_.sk);
Aws::Client::ClientConfiguration aws_config;
aws_config.endpointOverride = conf_.endpoint;
aws_config.region = conf_.region;
aws_config.retryStrategy = std::make_shared<Aws::Client::DefaultRetryStrategy>(
/*maxRetries = 10, scaleFactor = 25*/);
s3_client_ = std::make_shared<Aws::S3::S3Client>(
std::move(aws_cred), std::move(aws_config),
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
true /* useVirtualAddressing */);
return 0;
}
int S3Accessor::delete_objects_by_prefix(const std::string& relative_path) {
Aws::S3::Model::ListObjectsV2Request request;
auto prefix = get_key(relative_path);
request.WithBucket(conf_.bucket).WithPrefix(prefix);
Aws::S3::Model::DeleteObjectsRequest delete_request;
delete_request.SetBucket(conf_.bucket);
bool is_truncated = false;
do {
auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->ListObjectsV2(request),
"s3_client::list_objects_v2", request);
if (!outcome.IsSuccess()) {
LOG_WARNING("failed to list objects")
.tag("endpoint", conf_.endpoint)
.tag("bucket", conf_.bucket)
.tag("prefix", prefix)
.tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode()))
.tag("error", outcome.GetError().GetMessage());
if (outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::FORBIDDEN) {
return 1;
}
return -1;
}
const auto& result = outcome.GetResult();
VLOG_DEBUG << "get " << result.GetContents().size() << " objects";
Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects;
objects.reserve(result.GetContents().size());
for (const auto& obj : result.GetContents()) {
objects.emplace_back().SetKey(obj.GetKey());
LOG_INFO("delete object")
.tag("endpoint", conf_.endpoint)
.tag("bucket", conf_.bucket)
.tag("key", obj.GetKey());
}
if (!objects.empty()) {
Aws::S3::Model::Delete del;
del.WithObjects(std::move(objects)).SetQuiet(true);
delete_request.SetDelete(std::move(del));
auto delete_outcome =
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->DeleteObjects(delete_request),
"s3_client::delete_objects", delete_request);
if (!delete_outcome.IsSuccess()) {
LOG_WARNING("failed to delete objects")
.tag("endpoint", conf_.endpoint)
.tag("bucket", conf_.bucket)
.tag("prefix", prefix)
.tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode()))
.tag("error", outcome.GetError().GetMessage());
if (delete_outcome.GetError().GetResponseCode() ==
Aws::Http::HttpResponseCode::FORBIDDEN) {
return 1;
}
return -2;
}
if (!delete_outcome.GetResult().GetErrors().empty()) {
const auto& e = delete_outcome.GetResult().GetErrors().front();
LOG_WARNING("failed to delete object")
.tag("endpoint", conf_.endpoint)
.tag("bucket", conf_.bucket)
.tag("key", e.GetKey())
.tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode()))
.tag("error", e.GetMessage());
return -3;
}
}
is_truncated = result.GetIsTruncated();
request.SetContinuationToken(result.GetNextContinuationToken());
} while (is_truncated);
return 0;
}
int S3Accessor::delete_objects(const std::vector<std::string>& relative_paths) {
if (relative_paths.empty()) {
return 0;
}
// `DeleteObjectsRequest` can only contain 1000 keys at most.
constexpr size_t max_delete_batch = 1000;
auto path_iter = relative_paths.begin();
Aws::S3::Model::DeleteObjectsRequest delete_request;
delete_request.SetBucket(conf_.bucket);
do {
Aws::S3::Model::Delete del;
Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects;
auto path_begin = path_iter;
for (; path_iter != relative_paths.end() && (path_iter - path_begin < max_delete_batch);
++path_iter) {
auto key = get_key(*path_iter);
LOG_INFO("delete object")
.tag("endpoint", conf_.endpoint)
.tag("bucket", conf_.bucket)
.tag("key", key);
objects.emplace_back().SetKey(std::move(key));
}
if (objects.empty()) {
return 0;
}
del.WithObjects(std::move(objects)).SetQuiet(true);
delete_request.SetDelete(std::move(del));
auto delete_outcome =
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->DeleteObjects(delete_request),
"s3_client::delete_objects", delete_request);
if (!delete_outcome.IsSuccess()) {
LOG_WARNING("failed to delete objects")
.tag("endpoint", conf_.endpoint)
.tag("bucket", conf_.bucket)
.tag("key[0]", delete_request.GetDelete().GetObjects().front().GetKey())
.tag("responseCode",
static_cast<int>(delete_outcome.GetError().GetResponseCode()))
.tag("error", delete_outcome.GetError().GetMessage());
return -1;
}
if (!delete_outcome.GetResult().GetErrors().empty()) {
const auto& e = delete_outcome.GetResult().GetErrors().front();
LOG_WARNING("failed to delete object")
.tag("endpoint", conf_.endpoint)
.tag("bucket", conf_.bucket)
.tag("key", e.GetKey())
.tag("responseCode",
static_cast<int>(delete_outcome.GetError().GetResponseCode()))
.tag("error", e.GetMessage());
return -2;
}
} while (path_iter != relative_paths.end());
return 0;
}
int S3Accessor::delete_object(const std::string& relative_path) {
// TODO(cyx)
return 0;
}
int S3Accessor::put_object(const std::string& relative_path, const std::string& content) {
Aws::S3::Model::PutObjectRequest request;
auto key = get_key(relative_path);
request.WithBucket(conf_.bucket).WithKey(key);
auto input = Aws::MakeShared<Aws::StringStream>("S3Accessor");
*input << content;
request.SetBody(input);
auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->PutObject(request),
"s3_client::put_object", request);
if (!outcome.IsSuccess()) {
LOG_WARNING("failed to put object")
.tag("endpoint", conf_.endpoint)
.tag("bucket", conf_.bucket)
.tag("key", key)
.tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode()))
.tag("error", outcome.GetError().GetMessage());
return -1;
}
return 0;
}
int S3Accessor::list(const std::string& relative_path, std::vector<ObjectMeta>* files) {
Aws::S3::Model::ListObjectsV2Request request;
auto prefix = get_key(relative_path);
request.WithBucket(conf_.bucket).WithPrefix(prefix);
bool is_truncated = false;
do {
auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->ListObjectsV2(request),
"s3_client::list_objects_v2", request);
;
if (!outcome.IsSuccess()) {
LOG_WARNING("failed to list objects")
.tag("endpoint", conf_.endpoint)
.tag("bucket", conf_.bucket)
.tag("prefix", prefix)
.tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode()))
.tag("error", outcome.GetError().GetMessage());
return -1;
}
const auto& result = outcome.GetResult();
VLOG_DEBUG << "get " << result.GetContents().size() << " objects";
for (const auto& obj : result.GetContents()) {
files->push_back({obj.GetKey().substr(conf_.prefix.size() + 1), obj.GetSize()});
}
is_truncated = result.GetIsTruncated();
request.SetContinuationToken(result.GetNextContinuationToken());
} while (is_truncated);
return 0;
}
int S3Accessor::exist(const std::string& relative_path) {
Aws::S3::Model::HeadObjectRequest request;
auto key = get_key(relative_path);
request.WithBucket(conf_.bucket).WithKey(key);
auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->HeadObject(request),
"s3_client::head_object", request);
;
if (outcome.IsSuccess()) {
return 0;
} else if (outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) {
return 1;
} else {
LOG_WARNING("failed to head object")
.tag("endpoint", conf_.endpoint)
.tag("bucket", conf_.bucket)
.tag("key", key)
.tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode()))
.tag("error", outcome.GetError().GetMessage());
return -1;
}
}
int S3Accessor::delete_expired_objects(const std::string& relative_path, int64_t expired_time) {
Aws::S3::Model::ListObjectsV2Request request;
auto prefix = get_key(relative_path);
request.WithBucket(conf_.bucket).WithPrefix(prefix);
bool is_truncated = false;
do {
auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->ListObjectsV2(request),
"s3_client::list_objects_v2", request);
;
if (!outcome.IsSuccess()) {
LOG_WARNING("failed to list objects")
.tag("endpoint", conf_.endpoint)
.tag("bucket", conf_.bucket)
.tag("prefix", prefix)
.tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode()))
.tag("error", outcome.GetError().GetMessage());
return -1;
}
const auto& result = outcome.GetResult();
std::vector<std::string> expired_keys;
for (const auto& obj : result.GetContents()) {
if (obj.GetLastModified().Seconds() < expired_time) {
auto relative_key = get_relative_path(obj.GetKey());
if (relative_key.empty()) {
LOG_WARNING("failed get relative path")
.tag("prefix", conf_.prefix)
.tag("key", obj.GetKey());
} else {
expired_keys.push_back(relative_key);
LOG_INFO("delete expired object")
.tag("prefix", conf_.prefix)
.tag("key", obj.GetKey())
.tag("relative_key", relative_key)
.tag("lastModifiedTime", obj.GetLastModified().Seconds())
.tag("expiredTime", expired_time);
}
}
}
auto ret = delete_objects(expired_keys);
if (ret != 0) {
return ret;
}
LOG_INFO("delete expired objects")
.tag("endpoint", conf_.endpoint)
.tag("bucket", conf_.bucket)
.tag("prefix", conf_.prefix)
.tag("num_scanned", result.GetContents().size())
.tag("num_recycled", expired_keys.size());
is_truncated = result.GetIsTruncated();
request.SetContinuationToken(result.GetNextContinuationToken());
} while (is_truncated);
return 0;
}
int S3Accessor::get_bucket_lifecycle(int64_t* expiration_days) {
Aws::S3::Model::GetBucketLifecycleConfigurationRequest request;
request.SetBucket(conf_.bucket);
auto outcome =
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->GetBucketLifecycleConfiguration(request),
"s3_client::get_bucket_lifecycle_configuration", request);
bool has_lifecycle = false;
if (outcome.IsSuccess()) {
const auto& rules = outcome.GetResult().GetRules();
for (const auto& rule : rules) {
if (rule.NoncurrentVersionExpirationHasBeenSet()) {
has_lifecycle = true;
*expiration_days = rule.GetNoncurrentVersionExpiration().GetNoncurrentDays();
}
}
} else {
LOG_WARNING("Err for check interval: failed to get bucket lifecycle")
.tag("endpoint", conf_.endpoint)
.tag("bucket", conf_.bucket)
.tag("prefix", conf_.prefix)
.tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode()))
.tag("error", outcome.GetError().GetMessage());
return -1;
}
if (!has_lifecycle) {
LOG_WARNING("Err for check interval: bucket doesn't have lifecycle configuration")
.tag("endpoint", conf_.endpoint)
.tag("bucket", conf_.bucket)
.tag("prefix", conf_.prefix);
return -1;
}
return 0;
}
int S3Accessor::check_bucket_versioning() {
Aws::S3::Model::GetBucketVersioningRequest request;
request.SetBucket(conf_.bucket);
auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->GetBucketVersioning(request),
"s3_client::get_bucket_versioning", request);
if (outcome.IsSuccess()) {
const auto& versioning_configuration = outcome.GetResult().GetStatus();
if (versioning_configuration != Aws::S3::Model::BucketVersioningStatus::Enabled) {
LOG_WARNING("Err for check interval: bucket doesn't enable bucket versioning")
.tag("endpoint", conf_.endpoint)
.tag("bucket", conf_.bucket)
.tag("prefix", conf_.prefix);
return -1;
}
} else {
LOG_WARNING("Err for check interval: failed to get status of bucket versioning")
.tag("endpoint", conf_.endpoint)
.tag("bucket", conf_.bucket)
.tag("prefix", conf_.prefix)
.tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode()))
.tag("error", outcome.GetError().GetMessage());
return -1;
}
return 0;
}
#undef HELP_MACRO
} // namespace doris::cloud