blob: 4fcce6ce7194c4d767455fbbb313fd05b7d78847 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "FetchFile.h"
#include <cerrno>
#include <filesystem>
#include <utility>
#include "utils/ProcessorConfigUtils.h"
#include "utils/FileReaderCallback.h"
#include "utils/file/FileUtils.h"
#include "core/PropertyBuilder.h"
#include "core/Resource.h"
namespace org::apache::nifi::minifi::processors {
const core::Property FetchFile::FileToFetch(
core::PropertyBuilder::createProperty("File to Fetch")
->withDescription("The fully-qualified filename of the file to fetch from the file system. If not defined the default ${absolute.path}/${filename} path is used.")
->supportsExpressionLanguage(true)
->build());
const core::Property FetchFile::CompletionStrategy(
core::PropertyBuilder::createProperty("Completion Strategy")
->withDescription("Specifies what to do with the original file on the file system once it has been pulled into MiNiFi")
->withDefaultValue<std::string>(toString(CompletionStrategyOption::NONE))
->withAllowableValues<std::string>(CompletionStrategyOption::values())
->isRequired(true)
->build());
const core::Property FetchFile::MoveDestinationDirectory(
core::PropertyBuilder::createProperty("Move Destination Directory")
->withDescription("The directory to move the original file to once it has been fetched from the file system. "
"This property is ignored unless the Completion Strategy is set to \"Move File\". If the directory does not exist, it will be created.")
->supportsExpressionLanguage(true)
->build());
const core::Property FetchFile::MoveConflictStrategy(
core::PropertyBuilder::createProperty("Move Conflict Strategy")
->withDescription("If Completion Strategy is set to Move File and a file already exists in the destination directory with the same name, "
"this property specifies how that naming conflict should be resolved")
->withDefaultValue<std::string>(toString(MoveConflictStrategyOption::RENAME))
->withAllowableValues<std::string>(MoveConflictStrategyOption::values())
->isRequired(true)
->build());
const core::Property FetchFile::LogLevelWhenFileNotFound(
core::PropertyBuilder::createProperty("Log level when file not found")
->withDescription("Log level to use in case the file does not exist when the processor is triggered")
->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
->withAllowableValues<std::string>(LogLevelOption::values())
->isRequired(true)
->build());
const core::Property FetchFile::LogLevelWhenPermissionDenied(
core::PropertyBuilder::createProperty("Log level when permission denied")
->withDescription("Log level to use in case agent does not have sufficient permissions to read the file")
->withDefaultValue<std::string>(toString(LogLevelOption::LOGGING_ERROR))
->withAllowableValues<std::string>(LogLevelOption::values())
->isRequired(true)
->build());
const core::Relationship FetchFile::Success("success", "Any FlowFile that is successfully fetched from the file system will be transferred to this Relationship.");
const core::Relationship FetchFile::NotFound(
"not.found",
"Any FlowFile that could not be fetched from the file system because the file could not be found will be transferred to this Relationship.");
const core::Relationship FetchFile::PermissionDenied(
"permission.denied",
"Any FlowFile that could not be fetched from the file system due to the user running MiNiFi not having sufficient permissions will be transferred to this Relationship.");
const core::Relationship FetchFile::Failure(
"failure",
"Any FlowFile that could not be fetched from the file system for any reason other than insufficient permissions or the file not existing will be transferred to this Relationship.");
void FetchFile::initialize() {
setSupportedProperties(properties());
setSupportedRelationships(relationships());
}
void FetchFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &/*sessionFactory*/) {
gsl_Expects(context);
completion_strategy_ = utils::parseEnumProperty<CompletionStrategyOption>(*context, CompletionStrategy);
std::string move_destination_dir;
context->getProperty(MoveDestinationDirectory.getName(), move_destination_dir);
if (completion_strategy_ == CompletionStrategyOption::MOVE_FILE && move_destination_dir.empty()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Move Destination Directory is required when Completion Strategy is set to Move File");
}
move_confict_strategy_ = utils::parseEnumProperty<MoveConflictStrategyOption>(*context, MoveConflictStrategy);
log_level_when_file_not_found_ = utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenFileNotFound);
log_level_when_permission_denied_ = utils::parseEnumProperty<LogLevelOption>(*context, LogLevelWhenPermissionDenied);
}
std::filesystem::path FetchFile::getFileToFetch(core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) {
std::string file_to_fetch_path;
context.getProperty(FileToFetch, file_to_fetch_path, flow_file);
if (!file_to_fetch_path.empty()) {
return file_to_fetch_path;
}
flow_file->getAttribute("absolute.path", file_to_fetch_path);
std::string filename;
flow_file->getAttribute("filename", filename);
return std::filesystem::path(file_to_fetch_path) / filename;
}
template<typename... Args>
void FetchFile::logWithLevel(LogLevelOption log_level, Args&&... args) const {
switch (log_level.value()) {
case LogLevelOption::LOGGING_TRACE:
logger_->log_trace(std::forward<Args>(args)...);
break;
case LogLevelOption::LOGGING_DEBUG:
logger_->log_debug(std::forward<Args>(args)...);
break;
case LogLevelOption::LOGGING_INFO:
logger_->log_info(std::forward<Args>(args)...);
break;
case LogLevelOption::LOGGING_WARN:
logger_->log_warn(std::forward<Args>(args)...);
break;
case LogLevelOption::LOGGING_ERROR:
logger_->log_error(std::forward<Args>(args)...);
break;
case LogLevelOption::LOGGING_OFF:
default:
break;
}
}
std::filesystem::path FetchFile::getMoveAbsolutePath(const std::filesystem::path& file_name) const {
return move_destination_directory_ / file_name;
}
bool FetchFile::moveDestinationConflicts(const std::filesystem::path& file_name) const {
return utils::file::FileUtils::exists(getMoveAbsolutePath(file_name));
}
bool FetchFile::moveWouldFailWithDestinationConflict(const std::filesystem::path& file_name) const {
if (completion_strategy_ != CompletionStrategyOption::MOVE_FILE || move_confict_strategy_ != MoveConflictStrategyOption::FAIL) {
return false;
}
return moveDestinationConflicts(file_name);
}
void FetchFile::executeMoveConflictStrategy(const std::filesystem::path& file_to_fetch_path, const std::filesystem::path& file_name) {
if (move_confict_strategy_ == MoveConflictStrategyOption::REPLACE_FILE) {
auto moved_path = getMoveAbsolutePath(file_name);
logger_->log_debug("Due to conflict replacing file '%s' by the Move Completion Strategy", moved_path.string());
std::filesystem::rename(file_to_fetch_path, moved_path);
} else if (move_confict_strategy_ == MoveConflictStrategyOption::RENAME) {
std::filesystem::path generated_filename{utils::IdGenerator::getIdGenerator()->generate().to_string().view()};
logger_->log_debug("Due to conflict file '%s' is moved with generated name '%s' by the Move Completion Strategy", file_to_fetch_path.string(), generated_filename.string());
std::filesystem::rename(file_to_fetch_path, getMoveAbsolutePath(generated_filename));
} else if (move_confict_strategy_ == MoveConflictStrategyOption::KEEP_EXISTING) {
logger_->log_debug("Due to conflict file '%s' is deleted by the Move Completion Strategy", file_to_fetch_path.string());
std::filesystem::remove(file_to_fetch_path);
}
}
void FetchFile::processMoveCompletion(const std::filesystem::path& file_to_fetch_path, const std::filesystem::path& file_name) {
if (!moveDestinationConflicts(file_name)) {
if (!utils::file::FileUtils::exists(move_destination_directory_)) {
std::filesystem::create_directories(move_destination_directory_);
}
auto moved_path = getMoveAbsolutePath(file_name);
logger_->log_debug("'%s' is moved to '%s' by the Move Completion Strategy", file_to_fetch_path.string(), moved_path.string());
std::filesystem::rename(file_to_fetch_path, moved_path);
return;
}
executeMoveConflictStrategy(file_to_fetch_path, file_name);
}
void FetchFile::executeCompletionStrategy(const std::filesystem::path& file_to_fetch_path, const std::filesystem::path& file_name) {
try {
if (completion_strategy_ == CompletionStrategyOption::MOVE_FILE) {
processMoveCompletion(file_to_fetch_path, file_name);
} else if (completion_strategy_ == CompletionStrategyOption::DELETE_FILE) {
logger_->log_debug("File '%s' is deleted by the Delete Completion Strategy", file_to_fetch_path.string());
std::filesystem::remove(file_to_fetch_path);
}
} catch(const std::filesystem::filesystem_error& ex) {
logger_->log_warn("Executing completion strategy failed due to filesystem error: %s", ex.what());
}
}
void FetchFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
gsl_Expects(context && session);
logger_->log_trace("FetchFile onTrigger");
auto flow_file = session->get();
if (!flow_file) {
context->yield();
return;
}
const auto file_to_fetch_path = getFileToFetch(*context, flow_file);
if (!std::filesystem::is_regular_file(file_to_fetch_path)) {
logWithLevel(log_level_when_file_not_found_, "File to fetch was not found: '%s'!", file_to_fetch_path.string());
session->transfer(flow_file, NotFound);
return;
}
auto file_name = file_to_fetch_path.filename();
std::string move_destination_directory;
context->getProperty(MoveDestinationDirectory, move_destination_directory, flow_file);
move_destination_directory_ = move_destination_directory;
if (moveWouldFailWithDestinationConflict(file_name)) {
logger_->log_error("Move destination (%s) conflicts with an already existing file!", move_destination_directory_.string());
session->transfer(flow_file, Failure);
return;
}
try {
utils::FileReaderCallback callback(file_to_fetch_path);
session->write(flow_file, std::move(callback));
logger_->log_debug("Fetching file '%s' successful!", file_to_fetch_path.string());
session->transfer(flow_file, Success);
} catch (const utils::FileReaderCallbackIOError& io_error) {
if (io_error.error_code == EACCES) {
logWithLevel(log_level_when_permission_denied_, "Read permission denied for file '%s' to be fetched!", file_to_fetch_path.string());
session->transfer(flow_file, PermissionDenied);
} else {
logger_->log_error("Fetching file '%s' failed! %s", file_to_fetch_path.string(), io_error.what());
session->transfer(flow_file, Failure);
}
return;
}
executeCompletionStrategy(file_to_fetch_path, file_name);
}
REGISTER_RESOURCE(FetchFile, Processor);
} // namespace org::apache::nifi::minifi::processors