blob: a5a8977e17b5ff8c198da056d4381f9620cde61b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "recycler/s3_obj_client.h"
#include <aws/s3/S3Client.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/GetBucketLifecycleConfigurationRequest.h>
#include <aws/s3/model/GetBucketVersioningRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <ranges>
#include "common/config.h"
#include "common/logging.h"
#include "common/stopwatch.h"
#include "cpp/s3_rate_limiter.h"
#include "cpp/sync_point.h"
#include "recycler/s3_accessor.h"
#include "recycler/util.h"
namespace doris::cloud {
[[maybe_unused]] static Aws::Client::AWSError<Aws::S3::S3Errors> s3_error_factory() {
return {Aws::S3::S3Errors::INTERNAL_FAILURE, "exceeds limit", "exceeds limit", false};
}
template <typename Func>
auto s3_rate_limit(S3RateLimitType op, Func callback) -> decltype(callback()) {
using T = decltype(callback());
if (!config::enable_s3_rate_limiter) {
return callback();
}
auto sleep_duration = AccessorRateLimiter::instance().rate_limiter(op)->add(1);
if (sleep_duration < 0) {
return T(s3_error_factory());
}
return callback();
}
template <typename Func>
auto s3_get_rate_limit(Func callback) -> decltype(callback()) {
return s3_rate_limit(S3RateLimitType::GET, std::move(callback));
}
template <typename Func>
auto s3_put_rate_limit(Func callback) -> decltype(callback()) {
return s3_rate_limit(S3RateLimitType::PUT, std::move(callback));
}
class S3ObjListIterator final : public ObjectListIterator {
public:
S3ObjListIterator(std::shared_ptr<Aws::S3::S3Client> client, std::string bucket,
std::string prefix, std::string endpoint)
: client_(std::move(client)), endpoint_(std::move(endpoint)) {
req_.WithBucket(std::move(bucket)).WithPrefix(std::move(prefix));
TEST_SYNC_POINT_CALLBACK("S3ObjListIterator", &req_);
}
~S3ObjListIterator() override = default;
bool is_valid() override { return is_valid_; }
bool has_next() override {
if (!is_valid_) {
return false;
}
if (!results_.empty()) {
return true;
}
if (!has_more_) {
return false;
}
auto outcome = s3_get_rate_limit([&]() {
SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
return client_->ListObjectsV2(req_);
});
if (!outcome.IsSuccess()) {
LOG_WARNING("failed to list objects")
.tag("endpoint", endpoint_)
.tag("bucket", req_.GetBucket())
.tag("prefix", req_.GetPrefix())
.tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode()))
.tag("error", outcome.GetError().GetMessage());
is_valid_ = false;
return false;
}
if (outcome.GetResult().GetIsTruncated() &&
outcome.GetResult().GetNextContinuationToken().empty()) {
LOG_WARNING("failed to list objects, isTruncated but no continuation token")
.tag("endpoint", endpoint_)
.tag("bucket", req_.GetBucket())
.tag("prefix", req_.GetPrefix());
is_valid_ = false;
return false;
}
has_more_ = outcome.GetResult().GetIsTruncated();
req_.SetContinuationToken(std::move(
const_cast<std::string&&>(outcome.GetResult().GetNextContinuationToken())));
auto&& content = outcome.GetResult().GetContents();
DCHECK(!(has_more_ && content.empty())) << has_more_ << ' ' << content.empty();
results_.reserve(content.size());
for (auto&& obj : std::ranges::reverse_view(content)) {
DCHECK(obj.GetKey().starts_with(req_.GetPrefix()))
<< obj.GetKey() << ' ' << req_.GetPrefix();
results_.emplace_back(
ObjectMeta {.key = std::move(const_cast<std::string&&>(obj.GetKey())),
.size = obj.GetSize(),
.mtime_s = obj.GetLastModified().Seconds()});
}
return !results_.empty();
}
std::optional<ObjectMeta> next() override {
std::optional<ObjectMeta> res;
if (!has_next()) {
return res;
}
res = std::move(results_.back());
results_.pop_back();
return res;
}
private:
std::shared_ptr<Aws::S3::S3Client> client_;
Aws::S3::Model::ListObjectsV2Request req_;
std::vector<ObjectMeta> results_;
std::string endpoint_;
bool is_valid_ {true};
bool has_more_ {true};
};
static constexpr size_t MaxDeleteBatch = 1000;
S3ObjClient::~S3ObjClient() = default;
ObjectStorageResponse S3ObjClient::put_object(ObjectStoragePathRef path, std::string_view stream) {
Aws::S3::Model::PutObjectRequest request;
request.WithBucket(path.bucket).WithKey(path.key);
auto input = Aws::MakeShared<Aws::StringStream>("S3Accessor");
*input << stream;
request.SetBody(input);
auto outcome = s3_put_rate_limit([&]() {
SCOPED_BVAR_LATENCY(s3_bvar::s3_put_latency);
return s3_client_->PutObject(request);
});
if (!outcome.IsSuccess()) {
LOG_WARNING("failed to put object")
.tag("endpoint", endpoint_)
.tag("bucket", path.bucket)
.tag("key", path.key)
.tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode()))
.tag("error", outcome.GetError().GetMessage());
return -1;
}
return 0;
}
ObjectStorageResponse S3ObjClient::head_object(ObjectStoragePathRef path, ObjectMeta* res) {
Aws::S3::Model::HeadObjectRequest request;
request.WithBucket(path.bucket).WithKey(path.key);
auto outcome = s3_get_rate_limit([&]() {
SCOPED_BVAR_LATENCY(s3_bvar::s3_head_latency);
return s3_client_->HeadObject(request);
});
if (outcome.IsSuccess()) {
res->key = path.key;
res->size = outcome.GetResult().GetContentLength();
res->mtime_s = outcome.GetResult().GetLastModified().Seconds();
return 0;
} else if (outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) {
return 1;
} else {
LOG_WARNING("failed to head object")
.tag("endpoint", endpoint_)
.tag("bucket", path.bucket)
.tag("key", path.key)
.tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode()))
.tag("error", outcome.GetError().GetMessage());
return -1;
}
}
std::unique_ptr<ObjectListIterator> S3ObjClient::list_objects(ObjectStoragePathRef path) {
return std::make_unique<S3ObjListIterator>(s3_client_, path.bucket, path.key, endpoint_);
}
ObjectStorageResponse S3ObjClient::delete_objects(const std::string& bucket,
std::vector<std::string> keys,
ObjClientOptions option) {
if (keys.empty()) {
return {0};
}
Aws::S3::Model::DeleteObjectsRequest delete_request;
delete_request.SetBucket(bucket);
auto issue_delete = [&bucket, &delete_request,
this](std::vector<Aws::S3::Model::ObjectIdentifier> objects) -> int {
if (objects.size() == 1) {
return delete_object({.bucket = bucket, .key = objects[0].GetKey()}).ret;
}
Aws::S3::Model::Delete del;
del.WithObjects(std::move(objects)).SetQuiet(true);
delete_request.SetDelete(std::move(del));
auto delete_outcome = s3_put_rate_limit([&]() {
SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_objects_latency);
return s3_client_->DeleteObjects(delete_request);
});
if (!delete_outcome.IsSuccess()) {
LOG_WARNING("failed to delete objects")
.tag("endpoint", endpoint_)
.tag("bucket", bucket)
.tag("key[0]", delete_request.GetDelete().GetObjects().front().GetKey())
.tag("responseCode",
static_cast<int>(delete_outcome.GetError().GetResponseCode()))
.tag("error", delete_outcome.GetError().GetMessage());
return -1;
}
if (!delete_outcome.IsSuccess()) {
LOG_WARNING("failed to delete objects")
.tag("endpoint", endpoint_)
.tag("bucket", bucket)
.tag("key[0]", delete_request.GetDelete().GetObjects().front().GetKey())
.tag("responseCode",
static_cast<int>(delete_outcome.GetError().GetResponseCode()))
.tag("error", delete_outcome.GetError().GetMessage());
return -1;
}
return 0;
};
int ret = 0;
// `DeleteObjectsRequest` can only contain 1000 keys at most.
std::vector<Aws::S3::Model::ObjectIdentifier> objects;
size_t delete_batch_size = MaxDeleteBatch;
TEST_INJECTION_POINT_CALLBACK("S3ObjClient::delete_objects", &delete_batch_size);
// std::views::chunk(1000)
for (auto&& key : keys) {
objects.emplace_back().SetKey(std::move(key));
if (objects.size() < delete_batch_size) {
continue;
}
ret = issue_delete(std::move(objects));
if (ret != 0) {
return {ret};
}
}
if (!objects.empty()) {
ret = issue_delete(std::move(objects));
}
return {ret};
}
ObjectStorageResponse S3ObjClient::delete_object(ObjectStoragePathRef path) {
Aws::S3::Model::DeleteObjectRequest request;
request.WithBucket(path.bucket).WithKey(path.key);
auto outcome = s3_put_rate_limit([&]() {
SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_object_latency);
return s3_client_->DeleteObject(request);
});
TEST_SYNC_POINT_CALLBACK("S3ObjClient::delete_object", &outcome);
if (!outcome.IsSuccess()) {
LOG_WARNING("failed to delete object")
.tag("endpoint", endpoint_)
.tag("bucket", path.bucket)
.tag("key", path.key)
.tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode()))
.tag("error", outcome.GetError().GetMessage())
.tag("exception", outcome.GetError().GetExceptionName());
if (outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) {
return {ObjectStorageResponse::NOT_FOUND, outcome.GetError().GetMessage()};
}
return {ObjectStorageResponse::UNDEFINED, outcome.GetError().GetMessage()};
}
return {ObjectStorageResponse::OK};
}
ObjectStorageResponse S3ObjClient::delete_objects_recursively(ObjectStoragePathRef path,
ObjClientOptions option,
int64_t expiration_time) {
return delete_objects_recursively_(path, option, expiration_time, MaxDeleteBatch);
}
ObjectStorageResponse S3ObjClient::get_life_cycle(const std::string& bucket,
int64_t* expiration_days) {
Aws::S3::Model::GetBucketLifecycleConfigurationRequest request;
request.SetBucket(bucket);
auto outcome = s3_get_rate_limit(
[&]() { return s3_client_->GetBucketLifecycleConfiguration(request); });
bool has_lifecycle = false;
if (outcome.IsSuccess()) {
const auto& rules = outcome.GetResult().GetRules();
for (const auto& rule : rules) {
if (rule.NoncurrentVersionExpirationHasBeenSet()) {
has_lifecycle = true;
*expiration_days = rule.GetNoncurrentVersionExpiration().GetNoncurrentDays();
}
}
} else {
LOG_WARNING("Err for check interval: failed to get bucket lifecycle")
.tag("endpoint", endpoint_)
.tag("bucket", bucket)
.tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode()))
.tag("error", outcome.GetError().GetMessage());
return -1;
}
if (!has_lifecycle) {
LOG_WARNING("Err for check interval: bucket doesn't have lifecycle configuration")
.tag("endpoint", endpoint_)
.tag("bucket", bucket);
return -1;
}
return 0;
}
ObjectStorageResponse S3ObjClient::check_versioning(const std::string& bucket) {
Aws::S3::Model::GetBucketVersioningRequest request;
request.SetBucket(bucket);
auto outcome = s3_get_rate_limit([&]() { return s3_client_->GetBucketVersioning(request); });
if (outcome.IsSuccess()) {
const auto& versioning_configuration = outcome.GetResult().GetStatus();
if (versioning_configuration != Aws::S3::Model::BucketVersioningStatus::Enabled) {
LOG_WARNING("Err for check interval: bucket doesn't enable bucket versioning")
.tag("endpoint", endpoint_)
.tag("bucket", bucket);
return -1;
}
} else {
LOG_WARNING("Err for check interval: failed to get status of bucket versioning")
.tag("endpoint", endpoint_)
.tag("bucket", bucket)
.tag("responseCode", static_cast<int>(outcome.GetError().GetResponseCode()))
.tag("error", outcome.GetError().GetMessage());
return -1;
}
return 0;
}
} // namespace doris::cloud