blob: f6591db5f28746cffaf57695a851fbd80528893d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "S3Processor.h"
#include <string>
#include <memory>
#include <utility>
#include "core/ProcessContext.h"
#include "S3Wrapper.h"
#include "AWSCredentialsService.h"
#include "properties/Properties.h"
#include "range/v3/algorithm/contains.hpp"
#include "utils/StringUtils.h"
#include "utils/HTTPUtils.h"
namespace org::apache::nifi::minifi::aws::processors {
S3Processor::S3Processor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger)
: core::Processor(name, uuid),
logger_(std::move(logger)) {
}
S3Processor::S3Processor(std::string_view name, const minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger, std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender)
: core::Processor(name, uuid),
logger_(std::move(logger)),
s3_wrapper_(std::move(s3_request_sender)) {
}
std::optional<Aws::Auth::AWSCredentials> S3Processor::getAWSCredentialsFromControllerService(core::ProcessContext& context) const {
std::string service_name;
if (!context.getProperty(AWSCredentialsProviderService, service_name) || service_name.empty()) {
return std::nullopt;
}
std::shared_ptr<core::controller::ControllerService> service = context.getControllerService(service_name);
if (!service) {
logger_->log_error("AWS credentials service with name: '{}' could not be found", service_name);
return std::nullopt;
}
auto aws_credentials_service = std::dynamic_pointer_cast<minifi::aws::controllers::AWSCredentialsService>(service);
if (!aws_credentials_service) {
logger_->log_error("Controller service with name: '{}' is not an AWS credentials service", service_name);
return std::nullopt;
}
return aws_credentials_service->getAWSCredentials();
}
std::optional<Aws::Auth::AWSCredentials> S3Processor::getAWSCredentials(
core::ProcessContext& context,
const std::shared_ptr<core::FlowFile> &flow_file) {
auto service_cred = getAWSCredentialsFromControllerService(context);
if (service_cred) {
logger_->log_info("AWS Credentials successfully set from controller service");
return service_cred.value();
}
aws::AWSCredentialsProvider aws_credentials_provider;
std::string value;
if (context.getProperty(AccessKey, value, flow_file)) {
aws_credentials_provider.setAccessKey(value);
}
if (context.getProperty(SecretKey, value, flow_file)) {
aws_credentials_provider.setSecretKey(value);
}
if (context.getProperty(CredentialsFile, value)) {
aws_credentials_provider.setCredentialsFile(value);
}
bool use_default_credentials = false;
if (context.getProperty(UseDefaultCredentials, use_default_credentials)) {
aws_credentials_provider.setUseDefaultCredentials(use_default_credentials);
}
return aws_credentials_provider.getAWSCredentials();
}
std::optional<aws::s3::ProxyOptions> S3Processor::getProxy(core::ProcessContext& context, const std::shared_ptr<core::FlowFile> &flow_file) {
aws::s3::ProxyOptions proxy;
context.getProperty(ProxyHost, proxy.host, flow_file);
std::string port_str;
if (context.getProperty(ProxyPort, port_str, flow_file) && !port_str.empty() && !core::Property::StringToInt(port_str, proxy.port)) {
logger_->log_error("Proxy port invalid");
return std::nullopt;
}
context.getProperty(ProxyUsername, proxy.username, flow_file);
context.getProperty(ProxyPassword, proxy.password, flow_file);
if (!proxy.host.empty()) {
logger_->log_info("Proxy for S3Processor was set.");
}
return proxy;
}
void S3Processor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
client_config_ = Aws::Client::ClientConfiguration();
std::string value;
if (!context.getProperty(Bucket, value) || value.empty()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bucket property missing or invalid");
}
if (!context.getProperty(Region, client_config_->region) || client_config_->region.empty() || !ranges::contains(region::REGIONS, client_config_->region)) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Region property missing or invalid");
}
logger_->log_debug("S3Processor: Region [{}]", client_config_->region);
if (auto communications_timeout = context.getProperty<core::TimePeriodValue>(CommunicationsTimeout)) {
logger_->log_debug("S3Processor: Communications Timeout {}", communications_timeout->getMilliseconds());
client_config_->connectTimeoutMs = gsl::narrow<long>(communications_timeout->getMilliseconds().count()); // NOLINT(runtime/int)
} else {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Communications Timeout missing or invalid");
}
static const auto default_ca_file = minifi::utils::getDefaultCAFile();
if (default_ca_file) {
client_config_->caFile = *default_ca_file;
}
}
std::optional<CommonProperties> S3Processor::getCommonELSupportedProperties(
core::ProcessContext& context,
const std::shared_ptr<core::FlowFile> &flow_file) {
CommonProperties properties;
if (!context.getProperty(Bucket, properties.bucket, flow_file) || properties.bucket.empty()) {
logger_->log_error("Bucket '{}' is invalid or empty!", properties.bucket);
return std::nullopt;
}
logger_->log_debug("S3Processor: Bucket [{}]", properties.bucket);
auto credentials = getAWSCredentials(context, flow_file);
if (!credentials) {
logger_->log_error("AWS Credentials have not been set!");
return std::nullopt;
}
properties.credentials = credentials.value();
auto proxy = getProxy(context, flow_file);
if (!proxy) {
return std::nullopt;
}
properties.proxy = proxy.value();
context.getProperty(EndpointOverrideURL, properties.endpoint_override_url, flow_file);
if (!properties.endpoint_override_url.empty()) {
logger_->log_debug("S3Processor: Endpoint Override URL [{}]", properties.endpoint_override_url);
}
return properties;
}
} // namespace org::apache::nifi::minifi::aws::processors