blob: 04b294814940211c64d68a249659450c3bbd43ed [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <aws/core/Aws.h>
#include <bvar/latency_recorder.h>
#include <array>
#include <cstdint>
#include <memory>
#include "cpp/aws_common.h"
#include "recycler/obj_storage_client.h"
#include "recycler/storage_vault_accessor.h"
namespace Aws::S3 {
class S3Client;
} // namespace Aws::S3
namespace doris {
class S3RateLimiterHolder;
enum class S3RateLimitType;
namespace cloud {
class ObjectStoreInfoPB;
class SimpleThreadPool;
namespace s3_bvar {
extern bvar::LatencyRecorder s3_get_latency;
extern bvar::LatencyRecorder s3_put_latency;
extern bvar::LatencyRecorder s3_delete_object_latency;
extern bvar::LatencyRecorder s3_delete_objects_latency;
extern bvar::LatencyRecorder s3_head_latency;
extern bvar::LatencyRecorder s3_multi_part_upload_latency;
extern bvar::LatencyRecorder s3_list_latency;
extern bvar::LatencyRecorder s3_list_object_versions_latency;
extern bvar::LatencyRecorder s3_get_bucket_version_latency;
extern bvar::LatencyRecorder s3_copy_object_latency;
}; // namespace s3_bvar
class S3Environment {
public:
S3Environment(const S3Environment&) = delete;
S3Environment& operator=(const S3Environment&) = delete;
static S3Environment& getInstance();
static Aws::Client::ClientConfiguration& getClientConfiguration() {
// The default constructor of ClientConfiguration will do some http call
// such as Aws::Internal::GetEC2MetadataClient and other init operation,
// which is unnecessary.
// So here we use a static instance, and deep copy every time
// to avoid unnecessary operations.
static Aws::Client::ClientConfiguration instance;
return instance;
}
~S3Environment();
private:
S3Environment();
Aws::SDKOptions aws_options_;
};
struct AccessorRateLimiter {
public:
~AccessorRateLimiter() = default;
static AccessorRateLimiter& instance();
S3RateLimiterHolder* rate_limiter(S3RateLimitType type);
private:
AccessorRateLimiter();
std::array<std::unique_ptr<S3RateLimiterHolder>, 2> _rate_limiters;
};
struct S3Conf {
std::string ak;
std::string sk;
std::string endpoint;
std::string region;
std::string bucket;
std::string prefix;
bool use_virtual_addressing {true};
CredProviderType cred_provider_type = CredProviderType::Default;
std::string role_arn;
std::string external_id;
enum Provider : uint8_t {
S3,
GCS,
AZURE,
};
Provider provider;
static std::optional<S3Conf> from_obj_store_info(const ObjectStoreInfoPB& obj_info,
bool skip_aksk = false);
};
class S3Accessor : public StorageVaultAccessor {
public:
explicit S3Accessor(S3Conf conf);
~S3Accessor() override;
// returns 0 for success otherwise error
static int create(S3Conf conf, std::shared_ptr<S3Accessor>* accessor);
// returns 0 for success otherwise error
int init();
int delete_prefix(const std::string& path_prefix, int64_t expiration_time = 0) override;
int delete_directory(const std::string& dir_path) override;
int delete_all(int64_t expiration_time = 0) override;
int delete_files(const std::vector<std::string>& paths) override;
int delete_file(const std::string& path) override;
int list_directory(const std::string& dir_path, std::unique_ptr<ListIterator>* res) override;
int list_all(std::unique_ptr<ListIterator>* res) override;
int put_file(const std::string& path, const std::string& content) override;
int exists(const std::string& path) override;
int abort_multipart_upload(const std::string& path, const std::string& upload_id) override;
// Get the objects' expiration time on the conf.bucket
// returns 0 for success otherwise error
int get_life_cycle(int64_t* expiration_days);
// Check if the objects' versioning is on or off
// returns 0 when versioning is on, otherwise versioning is off or check failed
int check_versioning();
protected:
int list_prefix(const std::string& path_prefix, std::unique_ptr<ListIterator>* res);
virtual int delete_prefix_impl(const std::string& path_prefix, int64_t expiration_time = 0);
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> _get_aws_credentials_provider_v1(
const S3Conf& s3_conf);
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> _get_aws_credentials_provider_v2(
const S3Conf& s3_conf);
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> get_aws_credentials_provider(
const S3Conf& s3_conf);
std::string get_key(const std::string& relative_path) const;
std::string to_uri(const std::string& relative_path) const;
S3Conf conf_;
std::shared_ptr<ObjStorageClient> obj_client_;
std::string _ca_cert_file_path;
};
class GcsAccessor final : public S3Accessor {
public:
explicit GcsAccessor(S3Conf conf) : S3Accessor(std::move(conf)) {}
~GcsAccessor() override = default;
int delete_files(const std::vector<std::string>& paths) override;
private:
int delete_prefix_impl(const std::string& path_prefix, int64_t expiration_time = 0) override;
};
} // namespace cloud
} // namespace doris