blob: b50874f1fd37af82569377dcde52a92a3ba4eb0c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "recycler/azure_obj_client.h"
#include <fmt/core.h>
#include <glog/logging.h>
#include <algorithm>
#include <azure/core/datetime.hpp>
#include <azure/core/io/body_stream.hpp>
#include <azure/storage/blobs.hpp>
#include <azure/storage/blobs/blob_client.hpp>
#include <azure/storage/blobs/blob_container_client.hpp>
#include <azure/storage/blobs/rest_client.hpp>
#include <azure/storage/common/storage_credential.hpp>
#include <azure/storage/common/storage_exception.hpp>
#include <iterator>
#include <ranges>
#include "common/config.h"
#include "common/logging.h"
#include "common/stopwatch.h"
#include "cpp/s3_rate_limiter.h"
#include "cpp/sync_point.h"
#include "recycler/s3_accessor.h"
#include "recycler/util.h"
using namespace Azure::Storage::Blobs;
namespace doris::cloud {
template <typename Func>
auto s3_rate_limit(S3RateLimitType op, Func callback) -> decltype(callback()) {
using T = decltype(callback());
if (!config::enable_s3_rate_limiter) {
return callback();
}
auto sleep_duration = AccessorRateLimiter::instance().rate_limiter(op)->add(1);
if (sleep_duration < 0) {
throw std::runtime_error("Azure exceeds request limit");
}
return callback();
}
template <typename Func>
auto s3_get_rate_limit(Func callback) -> decltype(callback()) {
return s3_rate_limit(S3RateLimitType::GET, std::move(callback));
}
template <typename Func>
auto s3_put_rate_limit(Func callback) -> decltype(callback()) {
return s3_rate_limit(S3RateLimitType::PUT, std::move(callback));
}
static constexpr size_t BlobBatchMaxOperations = 256;
static constexpr char BlobNotFound[] = "BlobNotFound";
template <typename Func>
ObjectStorageResponse do_azure_client_call(Func f, std::string_view url, std::string_view key) {
try {
f();
} catch (Azure::Core::RequestFailedException& e) {
auto msg = fmt::format(
"Azure request failed because {}, http_code: {}, request_id: {}, url: {}, "
"key: {}",
e.Message, static_cast<int>(e.StatusCode), e.RequestId, url, key);
LOG_WARNING(msg);
return {-1, std::move(msg)};
} catch (std::exception& e) {
auto msg = fmt::format("Azure request failed because {}, url: {}, key: {}", e.what(), url,
key);
LOG_WARNING(msg);
return {-1, std::move(msg)};
}
return {};
}
static const Azure::DateTime SystemClockEpoch {1970, 1, 1};
class AzureListIterator final : public ObjectListIterator {
public:
AzureListIterator(std::shared_ptr<BlobContainerClient> client, std::string prefix)
: client_(std::move(client)), req_({.Prefix = std::move(prefix)}) {
TEST_SYNC_POINT_CALLBACK("AzureListIterator", &req_);
}
~AzureListIterator() override = default;
bool is_valid() override { return is_valid_; }
bool has_next() override {
if (!is_valid_) {
return false;
}
if (!results_.empty()) {
return true;
}
if (!has_more_) {
return false;
}
try {
auto resp = s3_get_rate_limit([&]() {
SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
return client_->ListBlobs(req_);
});
has_more_ = resp.NextPageToken.HasValue();
DCHECK(!(has_more_ && resp.Blobs.empty())) << has_more_ << ' ' << resp.Blobs.empty();
req_.ContinuationToken = std::move(resp.NextPageToken);
results_.reserve(resp.Blobs.size());
for (auto&& item : std::ranges::reverse_view(resp.Blobs)) {
DCHECK(item.Name.starts_with(*req_.Prefix)) << item.Name << ' ' << *req_.Prefix;
results_.emplace_back(ObjectMeta {
.key = std::move(item.Name),
.size = item.BlobSize,
// `Azure::DateTime` adds the offset of `SystemClockEpoch` to the given Unix timestamp,
// so here we need to subtract this offset to obtain the Unix timestamp of the mtime.
// https://github.com/Azure/azure-sdk-for-cpp/blob/azure-core_1.12.0/sdk/core/azure-core/inc/azure/core/datetime.hpp#L129
.mtime_s = duration_cast<std::chrono::seconds>(item.Details.LastModified -
SystemClockEpoch)
.count()});
}
} catch (Azure::Storage::StorageException& e) {
LOG_WARNING(
"Azure request failed because {}, http_code: {}, request_id: {}, url: {}, "
"prefix: {}",
e.Message, static_cast<int>(e.StatusCode), e.RequestId, client_->GetUrl(),
req_.Prefix.Value());
is_valid_ = false;
return false;
}
return !results_.empty();
}
std::optional<ObjectMeta> next() override {
std::optional<ObjectMeta> res;
if (!has_next()) {
return res;
}
res = std::move(results_.back());
results_.pop_back();
return res;
}
private:
std::shared_ptr<BlobContainerClient> client_;
ListBlobsOptions req_;
std::vector<ObjectMeta> results_;
bool is_valid_ {true};
bool has_more_ {true};
};
AzureObjClient::~AzureObjClient() = default;
ObjectStorageResponse AzureObjClient::put_object(ObjectStoragePathRef path,
std::string_view stream) {
auto client = client_->GetBlockBlobClient(path.key);
return do_azure_client_call(
[&]() {
s3_put_rate_limit([&]() {
SCOPED_BVAR_LATENCY(s3_bvar::s3_put_latency);
return client.UploadFrom(reinterpret_cast<const uint8_t*>(stream.data()),
stream.size());
});
},
client_->GetUrl(), path.key);
}
ObjectStorageResponse AzureObjClient::head_object(ObjectStoragePathRef path, ObjectMeta* res) {
try {
auto&& properties = s3_get_rate_limit([&]() {
SCOPED_BVAR_LATENCY(s3_bvar::s3_head_latency);
return client_->GetBlockBlobClient(path.key).GetProperties().Value;
});
res->key = path.key;
res->mtime_s = properties.LastModified.time_since_epoch().count();
res->size = properties.BlobSize;
return {0};
} catch (Azure::Storage::StorageException& e) {
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) {
return {1};
}
auto msg = fmt::format(
"Head azure blob failed because {}, http_code: {}, request_id: {}, url: {}, "
"key: {}",
e.Message, static_cast<int>(e.StatusCode), e.RequestId, client_->GetUrl(),
path.key);
LOG_WARNING(msg);
return {-1, std::move(msg)};
}
}
std::unique_ptr<ObjectListIterator> AzureObjClient::list_objects(ObjectStoragePathRef path) {
return std::make_unique<AzureListIterator>(client_, path.key);
}
// As Azure's doc said, the batch size is 256
// You can find out the num in https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
// > Each batch request supports a maximum of 256 subrequests.
ObjectStorageResponse AzureObjClient::delete_objects(const std::string& bucket,
std::vector<std::string> keys,
ObjClientOptions option) {
if (keys.empty()) {
return {0};
}
// TODO(ByteYue) : use range to adate this code when compiler is ready
// auto chunkedView = objs | std::views::chunk(BlobBatchMaxOperations);
auto begin = std::begin(keys);
auto end = std::end(keys);
while (begin != end) {
auto batch = client_->CreateBatch();
auto chunk_end = begin;
size_t batch_size = BlobBatchMaxOperations;
TEST_SYNC_POINT_CALLBACK("AzureObjClient::delete_objects", &batch_size);
std::advance(chunk_end,
std::min(batch_size, static_cast<size_t>(std::distance(begin, end))));
std::vector<Azure::Storage::DeferredResponse<Models::DeleteBlobResult>> deferred_resps;
deferred_resps.reserve(std::distance(begin, chunk_end));
for (auto it = begin; it != chunk_end; ++it) {
deferred_resps.emplace_back(batch.DeleteBlob(*it));
}
auto resp = do_azure_client_call(
[&]() {
s3_put_rate_limit([&]() {
SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_objects_latency);
return client_->SubmitBatch(batch);
});
},
client_->GetUrl(), *begin);
if (resp.ret != 0) {
return resp;
}
for (auto&& defer : deferred_resps) {
try {
auto r = defer.GetResponse();
if (!r.Value.Deleted) {
LOG_INFO("Azure batch delete failed, url {}", client_->GetUrl());
return {-1};
}
} catch (Azure::Storage::StorageException& e) {
if (Azure::Core::Http::HttpStatusCode::NotFound == e.StatusCode &&
0 == strcmp(e.ErrorCode.c_str(), BlobNotFound)) {
continue;
}
auto msg = fmt::format(
"Azure request failed because {}, http code {}, request id {}, url {}",
e.Message, static_cast<int>(e.StatusCode), e.RequestId, client_->GetUrl());
LOG_WARNING(msg);
return {-1, std::move(msg)};
}
}
begin = chunk_end;
}
return {0};
}
ObjectStorageResponse AzureObjClient::delete_object(ObjectStoragePathRef path) {
return do_azure_client_call(
[&]() {
if (auto r = s3_put_rate_limit([&]() {
SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_object_latency);
return client_->DeleteBlob(path.key);
});
!r.Value.Deleted) {
throw std::runtime_error("Delete azure blob failed");
}
},
client_->GetUrl(), path.key);
}
ObjectStorageResponse AzureObjClient::delete_objects_recursively(ObjectStoragePathRef path,
ObjClientOptions option,
int64_t expiration_time) {
return delete_objects_recursively_(path, option, expiration_time, BlobBatchMaxOperations);
}
ObjectStorageResponse AzureObjClient::get_life_cycle(const std::string& bucket,
int64_t* expiration_days) {
// TODO(plat1ko)
*expiration_days = INT64_MAX;
return {0};
}
ObjectStorageResponse AzureObjClient::check_versioning(const std::string& bucket) {
// TODO(plat1ko)
return {0};
}
} // namespace doris::cloud