blob: 7930e31ff1511a6280c8e10abce028b71ade68ad [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "io/fs/s3_obj_storage_client.h"
#include <aws/core/client/AWSError.h>
#include <aws/core/http/HttpResponse.h>
#include <aws/core/http/URI.h>
#include <aws/core/utils/Array.h>
#include <aws/core/utils/HashingUtils.h>
#include <aws/core/utils/Outcome.h>
#include <aws/core/utils/memory/stl/AWSAllocator.h>
#include <aws/core/utils/memory/stl/AWSMap.h>
#include <aws/core/utils/memory/stl/AWSStreamFwd.h>
#include <aws/core/utils/memory/stl/AWSString.h>
#include <aws/core/utils/memory/stl/AWSStringStream.h>
#include <aws/core/utils/memory/stl/AWSVector.h>
#include <aws/core/utils/threading/Executor.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/S3Errors.h>
#include <aws/s3/model/AbortMultipartUploadRequest.h>
#include <aws/s3/model/AbortMultipartUploadResult.h>
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
#include <aws/s3/model/CompleteMultipartUploadResult.h>
#include <aws/s3/model/CompletedMultipartUpload.h>
#include <aws/s3/model/CompletedPart.h>
#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/CopyObjectResult.h>
#include <aws/s3/model/CreateMultipartUploadRequest.h>
#include <aws/s3/model/CreateMultipartUploadResult.h>
#include <aws/s3/model/Delete.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/DeleteObjectResult.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/DeleteObjectsResult.h>
#include <aws/s3/model/Error.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/GetObjectResult.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/HeadObjectResult.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/ListObjectsV2Result.h>
#include <aws/s3/model/Object.h>
#include <aws/s3/model/ObjectIdentifier.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/model/PutObjectResult.h>
#include <aws/s3/model/UploadPartRequest.h>
#include <aws/s3/model/UploadPartResult.h>
#include <algorithm>
#include <ranges>
#include "common/logging.h"
#include "common/status.h"
#include "cpp/sync_point.h"
#include "io/fs/err_utils.h"
#include "io/fs/s3_common.h"
#include "util/bvar_helper.h"
namespace {
inline ::Aws::Client::AWSError<::Aws::S3::S3Errors> s3_error_factory() {
return {::Aws::S3::S3Errors::INTERNAL_FAILURE, "exceeds limit", "exceeds limit", false};
}
template <typename Func>
auto s3_rate_limit(doris::S3RateLimitType op, Func callback) -> decltype(callback()) {
using T = decltype(callback());
if (!doris::config::enable_s3_rate_limiter) {
return callback();
}
auto sleep_duration = doris::S3ClientFactory::instance().rate_limiter(op)->add(1);
if (sleep_duration < 0) {
return T(s3_error_factory());
}
return callback();
}
template <typename Func>
auto s3_get_rate_limit(Func callback) -> decltype(callback()) {
return s3_rate_limit(doris::S3RateLimitType::GET, std::move(callback));
}
template <typename Func>
auto s3_put_rate_limit(Func callback) -> decltype(callback()) {
return s3_rate_limit(doris::S3RateLimitType::PUT, std::move(callback));
}
} // namespace
namespace Aws::S3::Model {
class DeleteObjectRequest;
} // namespace Aws::S3::Model
using Aws::S3::Model::CompletedPart;
using Aws::S3::Model::CompletedMultipartUpload;
using Aws::S3::Model::CompleteMultipartUploadRequest;
using Aws::S3::Model::CreateMultipartUploadRequest;
using Aws::S3::Model::UploadPartRequest;
using Aws::S3::Model::UploadPartOutcome;
namespace doris::io {
using namespace Aws::S3::Model;
static constexpr int S3_REQUEST_THRESHOLD_MS = 5000;
ObjectStorageUploadResponse S3ObjStorageClient::create_multipart_upload(
const ObjectStoragePathOptions& opts) {
CreateMultipartUploadRequest request;
request.WithBucket(opts.bucket).WithKey(opts.key);
request.SetContentType("application/octet-stream");
MonotonicStopWatch watch;
watch.start();
auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
s3_put_rate_limit([&]() { return _client->CreateMultipartUpload(request); }),
"s3_file_writer::create_multi_part_upload", std::cref(request).get());
SYNC_POINT_CALLBACK("s3_file_writer::_open", &outcome);
watch.stop();
s3_bvar::s3_multi_part_upload_latency << watch.elapsed_time_microseconds();
const auto& request_id = outcome.IsSuccess() ? outcome.GetResult().GetRequestId()
: outcome.GetError().GetRequestId();
LOG_IF(INFO, watch.elapsed_time_milliseconds() > S3_REQUEST_THRESHOLD_MS)
<< "CreateMultipartUpload cost=" << watch.elapsed_time_milliseconds() << "ms"
<< ", request_id=" << request_id << ", bucket=" << opts.bucket << ", key=" << opts.key;
if (!outcome.IsSuccess()) {
auto st = s3fs_error(outcome.GetError(), fmt::format("failed to CreateMultipartUpload: {} ",
opts.path.native()));
LOG(WARNING) << st << " request_id=" << request_id;
return ObjectStorageUploadResponse {
.resp = {convert_to_obj_response(std::move(st)),
static_cast<int>(outcome.GetError().GetResponseCode()),
outcome.GetError().GetRequestId()},
};
}
return ObjectStorageUploadResponse {.upload_id {outcome.GetResult().GetUploadId()}};
}
ObjectStorageResponse S3ObjStorageClient::put_object(const ObjectStoragePathOptions& opts,
std::string_view stream) {
Aws::S3::Model::PutObjectRequest request;
request.WithBucket(opts.bucket).WithKey(opts.key);
auto string_view_stream = std::make_shared<StringViewStream>(stream.data(), stream.size());
Aws::Utils::ByteBuffer part_md5(Aws::Utils::HashingUtils::CalculateMD5(*string_view_stream));
request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5));
request.SetBody(string_view_stream);
request.SetContentLength(stream.size());
request.SetContentType("application/octet-stream");
MonotonicStopWatch watch;
watch.start();
auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
s3_put_rate_limit([&]() { return _client->PutObject(request); }),
"s3_file_writer::put_object", std::cref(request).get(), &stream);
watch.stop();
s3_bvar::s3_put_latency << watch.elapsed_time_microseconds();
const auto& request_id = outcome.IsSuccess() ? outcome.GetResult().GetRequestId()
: outcome.GetError().GetRequestId();
if (!outcome.IsSuccess()) {
auto st = s3fs_error(outcome.GetError(),
fmt::format("failed to put object: {}", opts.path.native()));
LOG(WARNING) << st << ", request_id=" << request_id;
return ObjectStorageResponse {convert_to_obj_response(std::move(st)),
static_cast<int>(outcome.GetError().GetResponseCode()),
request_id};
}
LOG_IF(INFO, watch.elapsed_time_milliseconds() > S3_REQUEST_THRESHOLD_MS)
<< "PutObject cost=" << watch.elapsed_time_milliseconds() << "ms"
<< ", request_id=" << request_id << ", bucket=" << opts.bucket << ", key=" << opts.key;
return ObjectStorageResponse::OK();
}
ObjectStorageUploadResponse S3ObjStorageClient::upload_part(const ObjectStoragePathOptions& opts,
std::string_view stream, int part_num) {
UploadPartRequest request;
request.WithBucket(opts.bucket)
.WithKey(opts.key)
.WithPartNumber(part_num)
.WithUploadId(*opts.upload_id);
auto string_view_stream = std::make_shared<StringViewStream>(stream.data(), stream.size());
request.SetBody(string_view_stream);
Aws::Utils::ByteBuffer part_md5(Aws::Utils::HashingUtils::CalculateMD5(*string_view_stream));
request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5));
request.SetContentLength(stream.size());
request.SetContentType("application/octet-stream");
MonotonicStopWatch watch;
watch.start();
auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
s3_put_rate_limit([&]() { return _client->UploadPart(request); }),
"s3_file_writer::upload_part", std::cref(request).get(), &stream);
watch.stop();
s3_bvar::s3_multi_part_upload_latency << watch.elapsed_time_microseconds();
const auto& request_id = outcome.IsSuccess() ? outcome.GetResult().GetRequestId()
: outcome.GetError().GetRequestId();
TEST_SYNC_POINT_CALLBACK("S3FileWriter::_upload_one_part", &outcome);
if (!outcome.IsSuccess()) {
auto st = Status::IOError(
"failed to UploadPart bucket={}, key={}, part_num={}, upload_id={}, message={}, "
"exception_name={}, response_code={}, request_id={}",
opts.bucket, opts.path.native(), part_num, *opts.upload_id,
outcome.GetError().GetMessage(), outcome.GetError().GetExceptionName(),
outcome.GetError().GetResponseCode(), request_id);
LOG(WARNING) << st << ", request_id=" << request_id;
return ObjectStorageUploadResponse {
.resp = {convert_to_obj_response(std::move(st)),
static_cast<int>(outcome.GetError().GetResponseCode()),
outcome.GetError().GetRequestId()}};
}
LOG_IF(INFO, watch.elapsed_time_milliseconds() > S3_REQUEST_THRESHOLD_MS)
<< "UploadPart cost=" << watch.elapsed_time_milliseconds() << "ms"
<< ", request_id=" << request_id << ", bucket=" << opts.bucket << ", key=" << opts.key
<< ", part_num=" << part_num << ", upload_id=" << *opts.upload_id;
return ObjectStorageUploadResponse {.etag = outcome.GetResult().GetETag()};
}
ObjectStorageResponse S3ObjStorageClient::complete_multipart_upload(
const ObjectStoragePathOptions& opts,
const std::vector<ObjectCompleteMultiPart>& completed_parts) {
CompleteMultipartUploadRequest request;
request.WithBucket(opts.bucket).WithKey(opts.key).WithUploadId(*opts.upload_id);
CompletedMultipartUpload completed_upload;
std::vector<CompletedPart> complete_parts;
std::ranges::transform(completed_parts, std::back_inserter(complete_parts),
[](const ObjectCompleteMultiPart& part_ptr) {
CompletedPart part;
part.SetPartNumber(part_ptr.part_num);
part.SetETag(part_ptr.etag);
return part;
});
completed_upload.SetParts(std::move(complete_parts));
request.WithMultipartUpload(completed_upload);
TEST_SYNC_POINT_RETURN_WITH_VALUE("S3FileWriter::_complete:3", ObjectStorageResponse(), this);
MonotonicStopWatch watch;
watch.start();
auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
s3_put_rate_limit([&]() { return _client->CompleteMultipartUpload(request); }),
"s3_file_writer::complete_multi_part", std::cref(request).get());
watch.stop();
s3_bvar::s3_multi_part_upload_latency << watch.elapsed_time_microseconds();
const auto& request_id = outcome.IsSuccess() ? outcome.GetResult().GetRequestId()
: outcome.GetError().GetRequestId();
if (!outcome.IsSuccess()) {
auto st = s3fs_error(outcome.GetError(),
fmt::format("failed to CompleteMultipartUpload: {}, upload_id={}",
opts.path.native(), *opts.upload_id));
LOG(WARNING) << st << ", request_id=" << request_id;
return {convert_to_obj_response(std::move(st)),
static_cast<int>(outcome.GetError().GetResponseCode()),
outcome.GetError().GetRequestId()};
}
LOG_IF(INFO, watch.elapsed_time_milliseconds() > S3_REQUEST_THRESHOLD_MS)
<< "CompleteMultipartUpload cost=" << watch.elapsed_time_milliseconds() << "ms"
<< ", request_id=" << request_id << ", bucket=" << opts.bucket << ", key=" << opts.key
<< ", upload_id=" << *opts.upload_id;
return ObjectStorageResponse::OK();
}
ObjectStorageHeadResponse S3ObjStorageClient::head_object(const ObjectStoragePathOptions& opts) {
Aws::S3::Model::HeadObjectRequest request;
request.WithBucket(opts.bucket).WithKey(opts.key);
SCOPED_BVAR_LATENCY(s3_bvar::s3_head_latency);
auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
s3_get_rate_limit([&]() { return _client->HeadObject(request); }),
"s3_file_system::head_object", std::ref(request).get());
if (outcome.IsSuccess()) {
return {.resp = {convert_to_obj_response(Status::OK())},
.file_size = outcome.GetResult().GetContentLength()};
} else if (outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) {
return {.resp = {convert_to_obj_response(Status::Error<ErrorCode::NOT_FOUND, false>(""))}};
} else {
return {.resp = {convert_to_obj_response(
s3fs_error(outcome.GetError(),
fmt::format("failed to check exists {}", opts.key))),
static_cast<int>(outcome.GetError().GetResponseCode()),
outcome.GetError().GetRequestId()}};
}
}
ObjectStorageResponse S3ObjStorageClient::get_object(const ObjectStoragePathOptions& opts,
void* buffer, size_t offset, size_t bytes_read,
size_t* size_return) {
Aws::S3::Model::GetObjectRequest request;
request.WithBucket(opts.bucket).WithKey(opts.key);
request.SetRange(fmt::format("bytes={}-{}", offset, offset + bytes_read - 1));
request.SetResponseStreamFactory(AwsWriteableStreamFactory(buffer, bytes_read));
SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency);
auto outcome = s3_get_rate_limit([&]() { return _client->GetObject(request); });
if (!outcome.IsSuccess()) {
return {convert_to_obj_response(
s3fs_error(outcome.GetError(),
fmt::format("failed to read from {}", opts.path.native()))),
static_cast<int>(outcome.GetError().GetResponseCode()),
outcome.GetError().GetRequestId()};
}
*size_return = outcome.GetResult().GetContentLength();
// case for incomplete read
SYNC_POINT_CALLBACK("s3_obj_storage_client::get_object", size_return);
if (*size_return != bytes_read) {
return {convert_to_obj_response(Status::InternalError(
"failed to read from {}(bytes read: {}, bytes req: {}), request_id: {}",
opts.path.native(), *size_return, bytes_read, outcome.GetResult().GetRequestId()))};
}
return ObjectStorageResponse::OK();
}
ObjectStorageResponse S3ObjStorageClient::list_objects(const ObjectStoragePathOptions& opts,
std::vector<FileInfo>* files) {
Aws::S3::Model::ListObjectsV2Request request;
request.WithBucket(opts.bucket).WithPrefix(opts.prefix);
bool is_trucated = false;
do {
Aws::S3::Model::ListObjectsV2Outcome outcome;
{
SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
outcome = s3_get_rate_limit([&]() { return _client->ListObjectsV2(request); });
}
if (!outcome.IsSuccess()) {
files->clear();
return {convert_to_obj_response(s3fs_error(
outcome.GetError(), fmt::format("failed to list {}", opts.prefix))),
static_cast<int>(outcome.GetError().GetResponseCode()),
outcome.GetError().GetRequestId()};
}
for (const auto& obj : outcome.GetResult().GetContents()) {
std::string key = obj.GetKey();
bool is_dir = (key.back() == '/');
FileInfo file_info;
file_info.file_name = obj.GetKey();
file_info.file_size = obj.GetSize();
file_info.is_file = !is_dir;
files->push_back(std::move(file_info));
}
is_trucated = outcome.GetResult().GetIsTruncated();
if (is_trucated && outcome.GetResult().GetNextContinuationToken().empty()) {
return {convert_to_obj_response(
Status::InternalError("failed to list {}, is_trucated is true, but next "
"continuation token is empty, request_id={}",
opts.prefix, outcome.GetResult().GetRequestId()))};
}
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
} while (is_trucated);
return ObjectStorageResponse::OK();
}
ObjectStorageResponse S3ObjStorageClient::delete_objects(const ObjectStoragePathOptions& opts,
std::vector<std::string> objs) {
Aws::S3::Model::DeleteObjectsRequest delete_request;
delete_request.SetBucket(opts.bucket);
Aws::S3::Model::Delete del;
Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects;
std::ranges::transform(objs, std::back_inserter(objects), [](auto&& obj_key) {
Aws::S3::Model::ObjectIdentifier obj_identifier;
obj_identifier.SetKey(std::move(obj_key));
return obj_identifier;
});
del.WithObjects(std::move(objects)).SetQuiet(true);
delete_request.SetDelete(std::move(del));
SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_objects_latency);
auto delete_outcome =
s3_put_rate_limit([&]() { return _client->DeleteObjects(delete_request); });
if (!delete_outcome.IsSuccess()) {
return {convert_to_obj_response(
s3fs_error(delete_outcome.GetError(),
fmt::format("failed to delete dir {}", opts.key))),
static_cast<int>(delete_outcome.GetError().GetResponseCode()),
delete_outcome.GetError().GetRequestId()};
}
// case for partial delete object failure
SYNC_POINT_CALLBACK("s3_obj_storage_client::delete_objects", &delete_outcome);
if (!delete_outcome.GetResult().GetErrors().empty()) {
const auto& e = delete_outcome.GetResult().GetErrors().front();
return {convert_to_obj_response(
Status::InternalError("failed to delete object {}: {}, request_id={}", e.GetKey(),
e.GetMessage(), delete_outcome.GetResult().GetRequestId()))};
}
return ObjectStorageResponse::OK();
}
ObjectStorageResponse S3ObjStorageClient::delete_object(const ObjectStoragePathOptions& opts) {
Aws::S3::Model::DeleteObjectRequest request;
request.WithBucket(opts.bucket).WithKey(opts.key);
SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_object_latency);
auto outcome = s3_put_rate_limit([&]() { return _client->DeleteObject(request); });
if (outcome.IsSuccess() ||
outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) {
return ObjectStorageResponse::OK();
}
return {convert_to_obj_response(s3fs_error(outcome.GetError(),
fmt::format("failed to delete file {}", opts.key))),
static_cast<int>(outcome.GetError().GetResponseCode()),
outcome.GetError().GetRequestId()};
}
ObjectStorageResponse S3ObjStorageClient::delete_objects_recursively(
const ObjectStoragePathOptions& opts) {
Aws::S3::Model::ListObjectsV2Request request;
request.WithBucket(opts.bucket).WithPrefix(opts.prefix);
Aws::S3::Model::DeleteObjectsRequest delete_request;
delete_request.SetBucket(opts.bucket);
bool is_trucated = false;
do {
Aws::S3::Model::ListObjectsV2Outcome outcome;
{
SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
outcome = s3_get_rate_limit([&]() { return _client->ListObjectsV2(request); });
}
if (!outcome.IsSuccess()) {
return {convert_to_obj_response(s3fs_error(
outcome.GetError(),
fmt::format("failed to list objects when delete dir {}", opts.prefix))),
static_cast<int>(outcome.GetError().GetResponseCode()),
outcome.GetError().GetRequestId()};
}
const auto& result = outcome.GetResult();
Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects;
objects.reserve(result.GetContents().size());
for (const auto& obj : result.GetContents()) {
objects.emplace_back().SetKey(obj.GetKey());
}
if (!objects.empty()) {
Aws::S3::Model::Delete del;
del.WithObjects(std::move(objects)).SetQuiet(true);
delete_request.SetDelete(std::move(del));
SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_objects_latency);
auto delete_outcome =
s3_put_rate_limit([&]() { return _client->DeleteObjects(delete_request); });
if (!delete_outcome.IsSuccess()) {
return {convert_to_obj_response(
s3fs_error(delete_outcome.GetError(),
fmt::format("failed to delete dir {}", opts.key))),
static_cast<int>(delete_outcome.GetError().GetResponseCode()),
delete_outcome.GetError().GetRequestId()};
}
// case for partial delete object failure
SYNC_POINT_CALLBACK("s3_obj_storage_client::delete_objects_recursively",
&delete_outcome);
if (!delete_outcome.GetResult().GetErrors().empty()) {
const auto& e = delete_outcome.GetResult().GetErrors().front();
return {convert_to_obj_response(Status::InternalError(
"failed to delete object {}: {}, request_id={}", opts.key, e.GetMessage(),
delete_outcome.GetResult().GetRequestId()))};
}
}
is_trucated = result.GetIsTruncated();
request.SetContinuationToken(result.GetNextContinuationToken());
} while (is_trucated);
return ObjectStorageResponse::OK();
}
std::string S3ObjStorageClient::generate_presigned_url(const ObjectStoragePathOptions& opts,
int64_t expiration_secs,
const S3ClientConf&) {
return _client->GeneratePresignedUrl(opts.bucket, opts.key, Aws::Http::HttpMethod::HTTP_GET,
expiration_secs);
}
} // namespace doris::io