blob: 9484ee495d9eaf8ace85e34a9db6d0c9a9e5172e [file] [log] [blame]
/**
* @file PutS3Object.h
* PutS3Object class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <sstream>
#include <utility>
#include <vector>
#include <memory>
#include <string>
#include <set>
#include <algorithm>
#include "aws/core/auth/AWSCredentialsProvider.h"
#include "S3Wrapper.h"
#include "core/Property.h"
#include "core/Processor.h"
#include "core/logging/Logger.h"
#include "core/logging/LoggerConfiguration.h"
#include "utils/OptionalUtils.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace aws {
namespace processors {
namespace region {
constexpr const char *AF_SOUTH_1 = "af-south-1";
constexpr const char *AP_EAST_1 = "ap-east-1";
constexpr const char *AP_NORTHEAST_1 = "ap-northeast-1";
constexpr const char *AP_NORTHEAST_2 = "ap-northeast-2";
constexpr const char *AP_NORTHEAST_3 = "ap-northeast-3";
constexpr const char *AP_SOUTH_1 = "ap-south-1";
constexpr const char *AP_SOUTHEAST_1 = "ap-southeast-1";
constexpr const char *AP_SOUTHEAST_2 = "ap-southeast-2";
constexpr const char *CA_CENTRAL_1 = "ca-central-1";
constexpr const char *CN_NORTH_1 = "cn-north-1";
constexpr const char *CN_NORTHWEST_1 = "cn-northwest-1";
constexpr const char *EU_CENTRAL_1 = "eu-central-1";
constexpr const char *EU_NORTH_1 = "eu-north-1";
constexpr const char *EU_SOUTH_1 = "eu-south-1";
constexpr const char *EU_WEST_1 = "eu-west-1";
constexpr const char *EU_WEST_2 = "eu-west-2";
constexpr const char *EU_WEST_3 = "eu-west-3";
constexpr const char *ME_SOUTH_1 = "me-south-1";
constexpr const char *SA_EAST_1 = "sa-east-1";
constexpr const char *US_EAST_1 = "us-east-1";
constexpr const char *US_EAST_2 = "us-east-2";
constexpr const char *US_GOV_EAST_1 = "us-gov-east-1";
constexpr const char *US_GOV_WEST_1 = "us-gov-west-1";
constexpr const char *US_WEST_1 = "us-west-1";
constexpr const char *US_WEST_2 = "us-west-2";
} // namespace region
class PutS3Object : public core::Processor {
public:
static constexpr char const* ProcessorName = "PutS3Object";
static const std::set<std::string> CANNED_ACLS;
static const std::set<std::string> REGIONS;
static const std::set<std::string> STORAGE_CLASSES;
static const std::set<std::string> SERVER_SIDE_ENCRYPTIONS;
// Supported Properties
static const core::Property ObjectKey;
static const core::Property Bucket;
static const core::Property ContentType;
static const core::Property AccessKey;
static const core::Property SecretKey;
static const core::Property CredentialsFile;
static const core::Property AWSCredentialsProviderService;
static const core::Property StorageClass;
static const core::Property ServerSideEncryption;
static const core::Property Region;
static const core::Property CommunicationsTimeout;
static const core::Property FullControlUserList;
static const core::Property ReadPermissionUserList;
static const core::Property ReadACLUserList;
static const core::Property WriteACLUserList;
static const core::Property CannedACL;
static const core::Property EndpointOverrideURL;
static const core::Property ProxyHost;
static const core::Property ProxyPort;
static const core::Property ProxyUsername;
static const core::Property ProxyPassword;
// Supported Relationships
static const core::Relationship Failure;
static const core::Relationship Success;
explicit PutS3Object(std::string name, minifi::utils::Identifier uuid = minifi::utils::Identifier())
: PutS3Object(name, uuid, minifi::utils::make_unique<aws::s3::S3Wrapper>()) {
}
explicit PutS3Object(std::string name, minifi::utils::Identifier uuid, std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper)
: core::Processor(std::move(name), uuid)
, s3_wrapper_(std::move(s3_wrapper)) {
}
~PutS3Object() override = default;
bool supportsDynamicProperties() override { return true; }
void initialize() override;
void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
class ReadCallback : public InputStreamCallback {
public:
static const uint64_t MAX_SIZE;
static const uint64_t BUFFER_SIZE;
ReadCallback(uint64_t flow_size, const minifi::aws::s3::PutObjectRequestParameters& options, aws::s3::S3WrapperBase* s3_wrapper)
: flow_size_(flow_size)
, options_(options)
, s3_wrapper_(s3_wrapper) {
}
int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
if (flow_size_ > MAX_SIZE) {
return -1;
}
std::vector<uint8_t> buffer;
auto data_stream = std::make_shared<std::stringstream>();
buffer.reserve(BUFFER_SIZE);
read_size_ = 0;
while (read_size_ < flow_size_) {
auto next_read_size = std::min(flow_size_ - read_size_, BUFFER_SIZE);
int read_ret = stream->read(buffer.data(), next_read_size);
if (read_ret < 0) {
return -1;
}
if (read_ret > 0) {
data_stream->write(reinterpret_cast<char*>(buffer.data()), next_read_size);
read_size_ += read_ret;
} else {
break;
}
}
result_ = s3_wrapper_->putObject(options_, data_stream);
return read_size_;
}
uint64_t flow_size_;
const minifi::aws::s3::PutObjectRequestParameters& options_;
aws::s3::S3WrapperBase* s3_wrapper_;
uint64_t read_size_ = 0;
minifi::utils::optional<minifi::aws::s3::PutObjectResult> result_ = minifi::utils::nullopt;
};
private:
minifi::utils::optional<Aws::Auth::AWSCredentials> getAWSCredentialsFromControllerService(const std::shared_ptr<core::ProcessContext> &context) const;
minifi::utils::optional<Aws::Auth::AWSCredentials> getAWSCredentialsFromProperties(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &flow_file) const;
minifi::utils::optional<Aws::Auth::AWSCredentials> getAWSCredentialsFromFile(const std::shared_ptr<core::ProcessContext> &context) const;
minifi::utils::optional<Aws::Auth::AWSCredentials> getAWSCredentials(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &flow_file) const;
void fillUserMetadata(const std::shared_ptr<core::ProcessContext> &context);
bool setProxy(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &flow_file);
bool getExpressionLanguageSupportedProperties(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &flow_file);
std::string parseAccessControlList(const std::string &comma_separated_list) const;
bool setCannedAcl(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &flow_file);
bool setAccessControl(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &flow_file);
void setAttributes(const std::shared_ptr<core::ProcessSession> &session, const std::shared_ptr<core::FlowFile> &flow_file, const minifi::aws::s3::PutObjectResult &put_object_result);
std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<PutS3Object>::getLogger()};
aws::s3::PutObjectRequestParameters put_s3_request_params_;
std::unique_ptr<aws::s3::S3WrapperBase> s3_wrapper_;
std::string user_metadata_;
};
REGISTER_RESOURCE(PutS3Object, "This Processor puts FlowFiles to an Amazon S3 Bucket.");
} // namespace processors
} // namespace aws
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org