blob: ddf9146cf38ccd8d0ab256e4db993c2017a442ae [file] [log] [blame]
/**
* @file PutFile.cpp
* PutFile class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PutFile.h"
#include <sys/stat.h>
#include <cstdint>
#include <cstdio>
#include <iostream>
#include <memory>
#include <string>
#include <set>
#ifdef WIN32
#include <Windows.h>
#endif
#include "utils/file/FileUtils.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
std::shared_ptr<utils::IdGenerator> PutFile::id_generator_ = utils::IdGenerator::getIdGenerator();
core::Property PutFile::Directory(
core::PropertyBuilder::createProperty("Directory")->withDescription("The output directory to which to put files")->supportsExpressionLanguage(true)->withDefaultValue(".")->build());
core::Property PutFile::ConflictResolution(
core::PropertyBuilder::createProperty("Conflict Resolution Strategy")->withDescription("Indicates what should happen when a file with the same name already exists in the output directory")
->withAllowableValue<std::string>(CONFLICT_RESOLUTION_STRATEGY_FAIL)->withAllowableValue(CONFLICT_RESOLUTION_STRATEGY_IGNORE)->withAllowableValue(CONFLICT_RESOLUTION_STRATEGY_REPLACE)
->withDefaultValue(CONFLICT_RESOLUTION_STRATEGY_FAIL)->build());
core::Property PutFile::CreateDirs("Create Missing Directories", "If true, then missing destination directories will be created. "
"If false, flowfiles are penalized and sent to failure.",
"true", true, "", { "Directory" }, { });
core::Property PutFile::MaxDestFiles(
core::PropertyBuilder::createProperty("Maximum File Count")->withDescription("Specifies the maximum number of files that can exist in the output directory")->withDefaultValue<int>(-1)->build());
#ifndef WIN32
core::Property PutFile::Permissions(
core::PropertyBuilder::createProperty("Permissions")
->withDescription("Sets the permissions on the output file to the value of this attribute. "
"Must be an octal number (e.g. 644 or 0755). Not supported on Windows systems.")
->build());
core::Property PutFile::DirectoryPermissions(
core::PropertyBuilder::createProperty("Directory Permissions")
->withDescription("Sets the permissions on the directories being created if 'Create Missing Directories' property is set. "
"Must be an octal number (e.g. 644 or 0755). Not supported on Windows systems.")
->build());
#endif
core::Relationship PutFile::Success("success", "All files are routed to success");
core::Relationship PutFile::Failure("failure", "Failed files (conflict, write failure, etc.) are transferred to failure");
void PutFile::initialize() {
// Set the supported properties
std::set<core::Property> properties;
properties.insert(Directory);
properties.insert(ConflictResolution);
properties.insert(CreateDirs);
properties.insert(MaxDestFiles);
#ifndef WIN32
properties.insert(Permissions);
properties.insert(DirectoryPermissions);
#endif
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
relationships.insert(Success);
relationships.insert(Failure);
setSupportedRelationships(relationships);
}
void PutFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
if (!context->getProperty(ConflictResolution.getName(), conflict_resolution_)) {
logger_->log_error("Conflict Resolution Strategy attribute is missing or invalid");
}
std::string value;
context->getProperty(CreateDirs.getName(), value);
utils::StringUtils::StringToBool(value, try_mkdirs_);
if (context->getProperty(MaxDestFiles.getName(), value)) {
core::Property::StringToInt(value, max_dest_files_);
}
#ifndef WIN32
getPermissions(context);
getDirectoryPermissions(context);
#endif
}
void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
if (IsNullOrEmpty(conflict_resolution_)) {
logger_->log_error("Conflict resolution value is invalid");
context->yield();
return;
}
std::shared_ptr<core::FlowFile> flowFile = session->get();
// Do nothing if there are no incoming files
if (!flowFile) {
return;
}
session->remove(flowFile);
std::string directory;
if (!context->getProperty(Directory, directory, flowFile)) {
logger_->log_error("Directory attribute is missing or invalid");
}
if (IsNullOrEmpty(directory)) {
logger_->log_error("Directory attribute evaluated to invalid value");
session->transfer(flowFile, Failure);
return;
}
std::string filename;
flowFile->getAttribute(core::SpecialFlowAttribute::FILENAME, filename);
std::string tmpFile = tmpWritePath(filename, directory);
logger_->log_debug("PutFile using temporary file %s", tmpFile);
// Determine dest full file paths
std::stringstream destFileSs;
destFileSs << directory << utils::file::FileUtils::get_separator() << filename;
std::string destFile = destFileSs.str();
logger_->log_debug("PutFile writing file %s into directory %s", filename, directory);
// If file exists, apply conflict resolution strategy
struct stat statResult;
if ((max_dest_files_ != -1) && utils::file::FileUtils::is_directory(directory.c_str())) {
int64_t count = 0;
// Callback, called for each file entry in the listed directory
// Return value is used to break (false) or continue (true) listing
auto lambda = [&count, this](const std::string&, const std::string&) -> bool {
return ++count < max_dest_files_;
};
utils::file::FileUtils::list_dir(directory, lambda, logger_, false);
if (count >= max_dest_files_) {
logger_->log_warn("Routing to failure because the output directory %s has at least %u files, which exceeds the "
"configured max number of files", directory, max_dest_files_);
session->transfer(flowFile, Failure);
return;
}
}
if (stat(destFile.c_str(), &statResult) == 0) {
logger_->log_warn("Destination file %s exists; applying Conflict Resolution Strategy: %s", destFile, conflict_resolution_);
if (conflict_resolution_ == CONFLICT_RESOLUTION_STRATEGY_REPLACE) {
putFile(session, flowFile, tmpFile, destFile, directory);
} else if (conflict_resolution_ == CONFLICT_RESOLUTION_STRATEGY_IGNORE) {
session->transfer(flowFile, Success);
} else {
session->transfer(flowFile, Failure);
}
} else {
putFile(session, flowFile, tmpFile, destFile, directory);
}
}
std::string PutFile::tmpWritePath(const std::string &filename, const std::string &directory) const {
utils::Identifier tmpFileUuid = id_generator_->generate();
std::stringstream tmpFileSs;
tmpFileSs << directory;
auto lastSeparatorPos = filename.find_last_of(utils::file::FileUtils::get_separator());
if (lastSeparatorPos == std::string::npos) {
tmpFileSs << utils::file::FileUtils::get_separator() << "." << filename;
} else {
tmpFileSs << utils::file::FileUtils::get_separator() << filename.substr(0, lastSeparatorPos) << utils::file::FileUtils::get_separator() << "." << filename.substr(lastSeparatorPos + 1);
}
tmpFileSs << "." << tmpFileUuid.to_string();
std::string tmpFile = tmpFileSs.str();
return tmpFile;
}
bool PutFile::putFile(core::ProcessSession *session, std::shared_ptr<core::FlowFile> flowFile, const std::string &tmpFile, const std::string &destFile, const std::string &destDir) {
struct stat dir_stat;
if (stat(destDir.c_str(), &dir_stat) && try_mkdirs_) {
// Attempt to create directories in file's path
std::stringstream dir_path_stream;
logger_->log_debug("Destination directory does not exist; will attempt to create: ", destDir);
size_t i = 0;
auto pos = destFile.find(utils::file::FileUtils::get_separator());
while (pos != std::string::npos) {
auto dir_path_component = destFile.substr(i, pos - i);
dir_path_stream << dir_path_component;
auto dir_path = dir_path_stream.str();
if (!dir_path_component.empty()) {
logger_->log_debug("Attempting to create directory if it does not already exist: %s", dir_path);
if (!utils::file::FileUtils::exists(dir_path)) {
utils::file::FileUtils::create_dir(dir_path, false);
#ifndef WIN32
if (directory_permissions_.valid()) {
utils::file::FileUtils::set_permissions(dir_path, directory_permissions_.getValue());
}
#endif
}
dir_path_stream << utils::file::FileUtils::get_separator();
} else if (pos == 0) {
// Support absolute paths
dir_path_stream << utils::file::FileUtils::get_separator();
}
i = pos + 1;
pos = destFile.find(utils::file::FileUtils::get_separator(), pos + 1);
}
}
bool success = false;
if (flowFile->getSize() > 0) {
ReadCallback cb(tmpFile, destFile);
session->read(flowFile, &cb);
logger_->log_debug("Committing %s", destFile);
success = cb.commit();
} else {
std::ofstream outfile(destFile, std::ios::out | std::ios::binary);
if (!outfile.good()) {
logger_->log_error("Failed to create empty file: %s", destFile);
} else {
success = true;
}
}
#ifndef WIN32
if (permissions_.valid()) {
utils::file::FileUtils::set_permissions(destFile, permissions_.getValue());
}
#endif
if (success) {
session->transfer(flowFile, Success);
return true;
} else {
session->transfer(flowFile, Failure);
}
return false;
}
#ifndef WIN32
void PutFile::getPermissions(core::ProcessContext *context) {
std::string permissions_str;
context->getProperty(Permissions.getName(), permissions_str);
if (permissions_str.empty()) {
return;
}
try {
permissions_.setValue(std::stoi(permissions_str, 0, 8));
} catch(const std::exception&) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Permissions property is invalid");
}
if (!permissions_.valid()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Permissions property is invalid: out of bounds");
}
}
void PutFile::getDirectoryPermissions(core::ProcessContext *context) {
std::string dir_permissions_str;
context->getProperty(DirectoryPermissions.getName(), dir_permissions_str);
if (dir_permissions_str.empty()) {
return;
}
try {
directory_permissions_.setValue(std::stoi(dir_permissions_str, 0, 8));
} catch(const std::exception&) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Directory Permissions property is invalid");
}
if (!directory_permissions_.valid()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Directory Permissions property is invalid: out of bounds");
}
}
#endif
PutFile::ReadCallback::ReadCallback(const std::string &tmp_file, const std::string &dest_file)
: tmp_file_(tmp_file),
dest_file_(dest_file) {
}
// Copy the entire file contents to the temporary file
int64_t PutFile::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
// Copy file contents into tmp file
write_succeeded_ = false;
size_t size = 0;
uint8_t buffer[1024];
std::ofstream tmp_file_os(tmp_file_, std::ios::out | std::ios::binary);
do {
int read = stream->read(buffer, 1024);
if (read < 0) {
return -1;
}
if (read == 0) {
break;
}
tmp_file_os.write(reinterpret_cast<char *>(buffer), read);
size += read;
} while (size < stream->size());
tmp_file_os.close();
if (tmp_file_os) {
write_succeeded_ = true;
}
return size;
}
// Renames tmp file to final destination
// Returns true if commit succeeded
bool PutFile::ReadCallback::commit() {
bool success = false;
logger_->log_info("PutFile committing put file operation to %s", dest_file_);
if (write_succeeded_) {
if (rename(tmp_file_.c_str(), dest_file_.c_str())) {
logger_->log_info("PutFile commit put file operation to %s failed because rename() call failed", dest_file_);
} else {
success = true;
logger_->log_info("PutFile commit put file operation to %s succeeded", dest_file_);
}
} else {
logger_->log_error("PutFile commit put file operation to %s failed because write failed", dest_file_);
}
return success;
}
// Clean up resources
PutFile::ReadCallback::~ReadCallback() {
// Clean up tmp file, if necessary
std::remove(tmp_file_.c_str());
}
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */