blob: b59f5d23bda64c6032fe1abb4959ae9cf955fb5e [file] [log] [blame]
/**
* @file S3Wrapper.h
* S3Wrapper class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <string>
#include <map>
#include <unordered_map>
#include <memory>
#include <sstream>
#include <utility>
#include <vector>
#include "aws/s3/model/StorageClass.h"
#include "aws/s3/model/ServerSideEncryption.h"
#include "aws/s3/model/ObjectCannedACL.h"
#include "core/logging/Logger.h"
#include "core/logging/LoggerConfiguration.h"
#include "utils/AWSInitializer.h"
#include "utils/OptionalUtils.h"
#include "utils/StringUtils.h"
#include "io/BaseStream.h"
#include "S3RequestSender.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace aws {
namespace s3 {
static const std::unordered_map<std::string, Aws::S3::Model::StorageClass> STORAGE_CLASS_MAP {
{"Standard", Aws::S3::Model::StorageClass::STANDARD},
{"ReducedRedundancy", Aws::S3::Model::StorageClass::REDUCED_REDUNDANCY},
{"StandardIA", Aws::S3::Model::StorageClass::STANDARD_IA},
{"OnezoneIA", Aws::S3::Model::StorageClass::ONEZONE_IA},
{"IntelligentTiering", Aws::S3::Model::StorageClass::INTELLIGENT_TIERING},
{"Glacier", Aws::S3::Model::StorageClass::GLACIER},
{"DeepArchive", Aws::S3::Model::StorageClass::DEEP_ARCHIVE}
};
static const std::map<Aws::S3::Model::ObjectStorageClass, std::string> OBJECT_STORAGE_CLASS_MAP {
{Aws::S3::Model::ObjectStorageClass::STANDARD, "Standard"},
{Aws::S3::Model::ObjectStorageClass::REDUCED_REDUNDANCY, "ReducedRedundancy"},
{Aws::S3::Model::ObjectStorageClass::STANDARD_IA, "StandardIA"},
{Aws::S3::Model::ObjectStorageClass::ONEZONE_IA, "OnezoneIA"},
{Aws::S3::Model::ObjectStorageClass::INTELLIGENT_TIERING, "IntelligentTiering"},
{Aws::S3::Model::ObjectStorageClass::GLACIER, "Glacier"},
{Aws::S3::Model::ObjectStorageClass::DEEP_ARCHIVE, "DeepArchive"}
};
static const std::map<Aws::S3::Model::ObjectVersionStorageClass, std::string> VERSION_STORAGE_CLASS_MAP {
{Aws::S3::Model::ObjectVersionStorageClass::STANDARD, "Standard"}
};
static const std::unordered_map<std::string, Aws::S3::Model::ServerSideEncryption> SERVER_SIDE_ENCRYPTION_MAP {
{"None", Aws::S3::Model::ServerSideEncryption::NOT_SET},
{"AES256", Aws::S3::Model::ServerSideEncryption::AES256},
{"aws_kms", Aws::S3::Model::ServerSideEncryption::aws_kms},
};
static const std::unordered_map<std::string, Aws::S3::Model::ObjectCannedACL> CANNED_ACL_MAP {
{"BucketOwnerFullControl", Aws::S3::Model::ObjectCannedACL::bucket_owner_full_control},
{"BucketOwnerRead", Aws::S3::Model::ObjectCannedACL::bucket_owner_read},
{"AuthenticatedRead", Aws::S3::Model::ObjectCannedACL::authenticated_read},
{"PublicReadWrite", Aws::S3::Model::ObjectCannedACL::public_read_write},
{"PublicRead", Aws::S3::Model::ObjectCannedACL::public_read},
{"Private", Aws::S3::Model::ObjectCannedACL::private_},
{"AwsExecRead", Aws::S3::Model::ObjectCannedACL::aws_exec_read},
};
struct Expiration {
std::string expiration_time;
std::string expiration_time_rule_id;
};
struct PutObjectResult {
std::string version;
std::string etag;
std::string expiration;
std::string ssealgorithm;
};
struct RequestParameters {
RequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
: credentials(creds)
, client_config(config) {}
Aws::Auth::AWSCredentials credentials;
Aws::Client::ClientConfiguration client_config;
void setClientConfig(const aws::s3::ProxyOptions& proxy, const std::string& endpoint_override_url) {
client_config.proxyHost = proxy.host;
client_config.proxyPort = proxy.port;
client_config.proxyUserName = proxy.username;
client_config.proxyPassword = proxy.password;
client_config.endpointOverride = endpoint_override_url;
}
};
struct PutObjectRequestParameters : public RequestParameters {
PutObjectRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
: RequestParameters(creds, config) {}
std::string bucket;
std::string object_key;
std::string storage_class;
std::string server_side_encryption;
std::string content_type;
std::map<std::string, std::string> user_metadata_map;
std::string fullcontrol_user_list;
std::string read_permission_user_list;
std::string read_acl_user_list;
std::string write_acl_user_list;
std::string canned_acl;
};
struct DeleteObjectRequestParameters : public RequestParameters {
DeleteObjectRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
: RequestParameters(creds, config) {}
std::string bucket;
std::string object_key;
std::string version;
};
struct GetObjectRequestParameters : public RequestParameters {
GetObjectRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
: RequestParameters(creds, config) {}
std::string bucket;
std::string object_key;
std::string version;
bool requester_pays = false;
};
struct HeadObjectResult {
std::string path;
std::string absolute_path;
std::string filename;
std::string mime_type;
std::string etag;
Expiration expiration;
std::string ssealgorithm;
std::string version;
std::map<std::string, std::string> user_metadata_map;
void setFilePaths(const std::string& key);
};
struct GetObjectResult : public HeadObjectResult {
int64_t write_size = 0;
};
struct ListRequestParameters : public RequestParameters {
ListRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
: RequestParameters(creds, config) {}
std::string bucket;
std::string delimiter;
std::string prefix;
bool use_versions = false;
uint64_t min_object_age = 0;
};
struct ListedObjectAttributes {
std::string filename;
std::string etag;
bool is_latest = false;
int64_t last_modified = 0;
int64_t length = 0;
std::string store_class;
std::string version;
};
using HeadObjectRequestParameters = GetObjectRequestParameters;
using GetObjectTagsParameters = DeleteObjectRequestParameters;
class S3Wrapper {
public:
S3Wrapper();
explicit S3Wrapper(std::unique_ptr<S3RequestSender>&& request_sender);
minifi::utils::optional<PutObjectResult> putObject(const PutObjectRequestParameters& options, std::shared_ptr<Aws::IOStream> data_stream);
bool deleteObject(const DeleteObjectRequestParameters& options);
minifi::utils::optional<GetObjectResult> getObject(const GetObjectRequestParameters& get_object_params, io::BaseStream& fetched_body);
minifi::utils::optional<std::vector<ListedObjectAttributes>> listBucket(const ListRequestParameters& params);
minifi::utils::optional<std::map<std::string, std::string>> getObjectTags(const GetObjectTagsParameters& params);
minifi::utils::optional<HeadObjectResult> headObject(const HeadObjectRequestParameters& head_object_params);
virtual ~S3Wrapper() = default;
private:
static Expiration getExpiration(const std::string& expiration);
void setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const std::string& canned_acl) const;
static int64_t writeFetchedBody(Aws::IOStream& source, const int64_t data_size, io::BaseStream& output);
static std::string getEncryptionString(Aws::S3::Model::ServerSideEncryption encryption);
minifi::utils::optional<std::vector<ListedObjectAttributes>> listVersions(const ListRequestParameters& params);
minifi::utils::optional<std::vector<ListedObjectAttributes>> listObjects(const ListRequestParameters& params);
void addListResults(const Aws::Vector<Aws::S3::Model::ObjectVersion>& content, uint64_t min_object_age, std::vector<ListedObjectAttributes>& listed_objects);
void addListResults(const Aws::Vector<Aws::S3::Model::Object>& content, uint64_t min_object_age, std::vector<ListedObjectAttributes>& listed_objects);
template<typename ListRequest>
ListRequest createListRequest(const ListRequestParameters& params);
template<typename FetchObjectRequest>
FetchObjectRequest createFetchObjectRequest(const GetObjectRequestParameters& get_object_params);
template<typename AwsResult, typename FetchObjectResult>
FetchObjectResult fillFetchObjectResult(const GetObjectRequestParameters& get_object_params, const AwsResult& fetch_object_result);
const utils::AWSInitializer& AWS_INITIALIZER = utils::AWSInitializer::get();
std::shared_ptr<minifi::core::logging::Logger> logger_{minifi::core::logging::LoggerFactory<S3Wrapper>::getLogger()};
std::unique_ptr<S3RequestSender> request_sender_;
uint64_t last_bucket_list_timestamp_ = 0;
};
} // namespace s3
} // namespace aws
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org