blob: 7c0524bd0caae8f00b53f1000c81a3c524739287 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ListS3.h"
#include <algorithm>
#include <memory>
#include <utility>
#include "minifi-cpp/core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "utils/ProcessorConfigUtils.h"
#include "utils/expected.h"
namespace org::apache::nifi::minifi::aws::processors {
void ListS3::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void ListS3::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) {
S3Processor::onSchedule(context, session_factory);
auto state_manager = context.getStateManager();
if (state_manager == nullptr) {
throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
}
state_manager_ = std::make_unique<minifi::utils::ListingStateManager>(state_manager);
auto common_properties = getCommonELSupportedProperties(context, nullptr);
if (!common_properties) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Required property is not set or invalid");
}
auto bucket = context.getProperty(Bucket.name) | minifi::utils::orThrow("Required property");
logger_->log_debug("S3Processor: Bucket [{}]", bucket);
gsl_Expects(client_config_);
list_request_params_ = std::make_unique<aws::s3::ListRequestParameters>(common_properties->credentials, *client_config_);
list_request_params_->setClientConfig(common_properties->proxy, common_properties->endpoint_override_url);
list_request_params_->bucket = bucket;
if (const auto delimiter = context.getProperty(Delimiter)) {
list_request_params_->delimiter = *delimiter;
}
logger_->log_debug("ListS3: Delimiter [{}]", list_request_params_->delimiter);
if (const auto prefix = context.getProperty(Prefix)) {
list_request_params_->prefix = *prefix;
}
logger_->log_debug("ListS3: Prefix [{}]", list_request_params_->prefix);
list_request_params_->use_versions = minifi::utils::parseBoolProperty(context, UseVersions);
logger_->log_debug("ListS3: UseVersions [{}]", list_request_params_->use_versions);
list_request_params_->min_object_age = minifi::utils::parseDurationProperty(context, MinimumObjectAge).count();
logger_->log_debug("S3Processor: Minimum Object Age [{}]", list_request_params_->min_object_age);
write_object_tags_ = minifi::utils::parseBoolProperty(context, WriteObjectTags);
logger_->log_debug("ListS3: WriteObjectTags [{}]", write_object_tags_);
write_user_metadata_ = minifi::utils::parseBoolProperty(context, WriteUserMetadata);
logger_->log_debug("ListS3: WriteUserMetadata [{}]", write_user_metadata_);
requester_pays_ = minifi::utils::parseBoolProperty(context, RequesterPays);
logger_->log_debug("ListS3: RequesterPays [{}]", requester_pays_);
}
void ListS3::writeObjectTags(
const aws::s3::ListedObjectAttributes &object_attributes,
core::ProcessSession &session,
core::FlowFile& flow_file) {
if (!write_object_tags_) {
return;
}
aws::s3::GetObjectTagsParameters params(list_request_params_->credentials, list_request_params_->client_config);
params.bucket = list_request_params_->bucket;
params.object_key = object_attributes.filename;
params.version = object_attributes.version;
auto get_object_tags_result = s3_wrapper_.getObjectTags(params);
if (get_object_tags_result) {
for (const auto& tag : *get_object_tags_result) {
session.putAttribute(flow_file, "s3.tag." + tag.first, tag.second);
}
} else {
logger_->log_warn("Failed to get object tags for object {} in bucket {}", object_attributes.filename, params.bucket);
}
}
void ListS3::writeUserMetadata(
const aws::s3::ListedObjectAttributes &object_attributes,
core::ProcessSession &session,
core::FlowFile& flow_file) {
if (!write_user_metadata_) {
return;
}
aws::s3::HeadObjectRequestParameters params(list_request_params_->credentials, list_request_params_->client_config);
params.bucket = list_request_params_->bucket;
params.object_key = object_attributes.filename;
params.version = object_attributes.version;
params.requester_pays = requester_pays_;
auto head_object_tags_result = s3_wrapper_.headObject(params);
if (head_object_tags_result) {
for (const auto& metadata : head_object_tags_result->user_metadata_map) {
session.putAttribute(flow_file, "s3.user.metadata." + metadata.first, metadata.second);
}
} else {
logger_->log_warn("Failed to get object metadata for object {} in bucket {}", params.object_key, params.bucket);
}
}
void ListS3::createNewFlowFile(
core::ProcessSession &session,
const aws::s3::ListedObjectAttributes &object_attributes) {
auto flow_file = session.create();
session.putAttribute(*flow_file, "s3.bucket", list_request_params_->bucket);
session.putAttribute(*flow_file, core::SpecialFlowAttribute::FILENAME, object_attributes.filename);
session.putAttribute(*flow_file, "s3.etag", object_attributes.etag);
session.putAttribute(*flow_file, "s3.isLatest", object_attributes.is_latest ? "true" : "false");
session.putAttribute(*flow_file, "s3.lastModified", std::to_string(object_attributes.last_modified.time_since_epoch() / std::chrono::milliseconds(1)));
session.putAttribute(*flow_file, "s3.length", std::to_string(object_attributes.length));
session.putAttribute(*flow_file, "s3.storeClass", object_attributes.store_class);
if (!object_attributes.version.empty()) {
session.putAttribute(*flow_file, "s3.version", object_attributes.version);
}
writeObjectTags(object_attributes, session, *flow_file);
writeUserMetadata(object_attributes, session, *flow_file);
session.transfer(flow_file, Success);
}
void ListS3::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
logger_->log_trace("ListS3 onTrigger");
auto aws_results = s3_wrapper_.listBucket(*list_request_params_);
if (!aws_results) {
logger_->log_error("Failed to list S3 bucket {}", list_request_params_->bucket);
context.yield();
return;
}
auto stored_listing_state = state_manager_->getCurrentState();
auto latest_listing_state = stored_listing_state;
std::size_t files_transferred = 0;
for (const auto& object_attributes : *aws_results) {
if (stored_listing_state.wasObjectListedAlready(object_attributes)) {
continue;
}
createNewFlowFile(session, object_attributes);
++files_transferred;
latest_listing_state.updateState(object_attributes);
}
logger_->log_debug("ListS3 transferred {} flow files", files_transferred);
state_manager_->storeState(latest_listing_state);
if (files_transferred == 0) {
logger_->log_debug("No new S3 objects were found in bucket {} to list", list_request_params_->bucket);
context.yield();
return;
}
}
REGISTER_RESOURCE(ListS3, Processor);
} // namespace org::apache::nifi::minifi::aws::processors