blob: 3262febf1b38478fb4bb3ab432b886c89781fc08 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
#include "PutSFTP.h"
#include <memory>
#include <algorithm>
#include <cctype>
#include <cstdint>
#include <cstring>
#include <iostream>
#include <iterator>
#include <limits>
#include <map>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "utils/ByteArrayCallback.h"
#include "core/FlowFile.h"
#include "core/logging/Logger.h"
#include "core/ProcessContext.h"
#include "core/Relationship.h"
#include "io/DataStream.h"
#include "io/StreamFactory.h"
#include "ResourceClaim.h"
#include "utils/StringUtils.h"
#include "utils/ScopeGuard.h"
#include "utils/file/FileUtils.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
core::Property PutSFTP::RemotePath(
core::PropertyBuilder::createProperty("Remote Path")->withDescription("The path on the remote system from which to pull or push files")
core::Property PutSFTP::CreateDirectory(
core::PropertyBuilder::createProperty("Create Directory")->withDescription("Specifies whether or not the remote directory should be created if it does not exist.")
core::Property PutSFTP::DisableDirectoryListing(
core::PropertyBuilder::createProperty("Disable Directory Listing")->withDescription("If set to 'true', directory listing is not performed prior to create missing directories. "
"By default, this processor executes a directory listing command to see target directory existence before creating missing directories. "
"However, there are situations that you might need to disable the directory listing such as the following. "
"Directory listing might fail with some permission setups (e.g. chmod 100) on a directory. "
"Also, if any other SFTP client created the directory after this processor performed a listing and before a directory creation request by this processor is finished, "
"then an error is returned because the directory already exists.")
core::Property PutSFTP::BatchSize(
core::PropertyBuilder::createProperty("Batch Size")->withDescription("The maximum number of FlowFiles to send in a single connection")
core::Property PutSFTP::ConflictResolution(
core::PropertyBuilder::createProperty("Conflict Resolution")->withDescription("Determines how to handle the problem of filename collisions")
core::Property PutSFTP::RejectZeroByte(
core::PropertyBuilder::createProperty("Reject Zero-Byte Files")->withDescription("Determines whether or not Zero-byte files should be rejected without attempting to transfer")
core::Property PutSFTP::DotRename(
core::PropertyBuilder::createProperty("Dot Rename")->withDescription("If true, then the filename of the sent file is prepended with a \".\" and then renamed back to the original once the file is completely sent. "
"Otherwise, there is no rename. This property is ignored if the Temporary Filename property is set.")
core::Property PutSFTP::TempFilename(
core::PropertyBuilder::createProperty("Temporary Filename")->withDescription("If set, the filename of the sent file will be equal to the value specified during the transfer and after successful completion will be renamed to the original filename. "
"If this value is set, the Dot Rename property is ignored.")
core::Property PutSFTP::LastModifiedTime(
core::PropertyBuilder::createProperty("Last Modified Time")->withDescription("The lastModifiedTime to assign to the file after transferring it. "
"If not set, the lastModifiedTime will not be changed. "
"Format must be yyyy-MM-dd'T'HH:mm:ssZ. "
"You may also use expression language such as ${file.lastModifiedTime}. "
"If the value is invalid, the processor will not be invalid but will fail to change lastModifiedTime of the file.")
core::Property PutSFTP::Permissions(
core::PropertyBuilder::createProperty("Permissions")->withDescription("The permissions to assign to the file after transferring it. "
"Format must be either UNIX rwxrwxrwx with a - in place of denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644). "
"If not set, the permissions will not be changed. "
"You may also use expression language such as ${file.permissions}. "
"If the value is invalid, the processor will not be invalid but will fail to change permissions of the file.")
core::Property PutSFTP::RemoteOwner(
core::PropertyBuilder::createProperty("Remote Owner")->withDescription("Integer value representing the User ID to set on the file after transferring it. "
"If not set, the owner will not be set. You may also use expression language such as ${file.owner}. "
"If the value is invalid, the processor will not be invalid but will fail to change the owner of the file.")
core::Property PutSFTP::RemoteGroup(
core::PropertyBuilder::createProperty("Remote Group")->withDescription("Integer value representing the Group ID to set on the file after transferring it. "
"If not set, the group will not be set. You may also use expression language such as ${}. "
"If the value is invalid, the processor will not be invalid but will fail to change the group of the file.")
core::Property PutSFTP::UseCompression(
core::PropertyBuilder::createProperty("Use Compression")->withDescription("Indicates whether or not ZLIB compression should be used when transferring files")
core::Relationship PutSFTP::Success("success", "FlowFiles that are successfully sent will be routed to success");
core::Relationship PutSFTP::Reject("reject", "FlowFiles that were rejected by the destination system");
core::Relationship PutSFTP::Failure("failure", "FlowFiles that failed to send to the remote system; failure is usually looped back to this processor");
void PutSFTP::initialize() {
logger_->log_trace("Initializing PutSFTP");
// Set the supported properties
std::set<core::Property> properties;
// Set the supported relationships
std::set<core::Relationship> relationships;
PutSFTP::PutSFTP(std::string name, utils::Identifier uuid /*= utils::Identifier()*/)
: SFTPProcessorBase(name, uuid),
dot_rename_(false) {
logger_ = logging::LoggerFactory<PutSFTP>::getLogger();
PutSFTP::~PutSFTP() {
void PutSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
std::string value;
if (!context->getProperty(CreateDirectory.getName(), value)) {
logger_->log_error("Create Directory attribute is missing or invalid");
} else {
utils::StringUtils::StringToBool(value, create_directory_);
if (!context->getProperty(BatchSize.getName(), value)) {
logger_->log_error("Batch Size attribute is missing or invalid");
} else {
core::Property::StringToInt(value, batch_size_);
context->getProperty(ConflictResolution.getName(), conflict_resolution_);
if (context->getProperty(RejectZeroByte.getName(), value)) {
utils::StringUtils::StringToBool(value, reject_zero_byte_);
if (context->getProperty(DotRename.getName(), value)) {
utils::StringUtils::StringToBool(value, dot_rename_);
if (!context->getProperty(UseCompression.getName(), value)) {
logger_->log_error("Use Compression attribute is missing or invalid");
} else {
utils::StringUtils::StringToBool(value, use_compression_);
PutSFTP::ReadCallback::ReadCallback(const std::string& target_path,
utils::SFTPClient& client,
const std::string& conflict_resolution)
: logger_(logging::LoggerFactory<PutSFTP::ReadCallback>::getLogger())
, write_succeeded_(false)
, target_path_(target_path)
, client_(client)
, conflict_resolution_(conflict_resolution) {
PutSFTP::ReadCallback::~ReadCallback() {
int64_t PutSFTP::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
if (!client_.putFile(target_path_,
conflict_resolution_ == CONFLICT_RESOLUTION_REPLACE /*overwrite*/,
stream->getSize() /*expected_size*/)) {
throw client_.getLastError();
return stream->getSize();
bool PutSFTP::processOne(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord>(session->get());
if (flow_file == nullptr) {
return false;
/* Parse common properties */
SFTPProcessorBase::CommonProperties common_properties;
if (!parseCommonPropertiesOnTrigger(context, flow_file, common_properties)) {
return false;
/* Parse processor-specific properties */
std::string filename;
std::string remote_path;
bool disable_directory_listing = false;
std::string temp_file_name;
bool last_modified_time_set = false;
int64_t last_modified_time = 0U;
bool permissions_set = false;
uint32_t permissions = 0U;
bool remote_owner_set = false;
uint64_t remote_owner = 0U;
bool remote_group_set = false;
uint64_t remote_group = 0U;
flow_file->getKeyedAttribute(FILENAME, filename);
std::string value;
context->getProperty(RemotePath, remote_path, flow_file);
/* Remove trailing slashes */
while (remote_path.size() > 1U && remote_path.back() == '/') {
remote_path.resize(remote_path.size() - 1);
/* Empty path means current directory, so we change it to '.' */
if (remote_path.empty()) {
remote_path = ".";
if (context->getDynamicProperty(DisableDirectoryListing.getName(), value)) {
utils::StringUtils::StringToBool(value, disable_directory_listing);
} else if (context->getProperty(DisableDirectoryListing.getName(), value)) {
utils::StringUtils::StringToBool(value, disable_directory_listing);
context->getProperty(TempFilename, temp_file_name, flow_file);
if (context->getProperty(LastModifiedTime, value, flow_file)) {
if (core::Property::StringToDateTime(value, last_modified_time)) {
last_modified_time_set = true;
if (context->getProperty(Permissions, value, flow_file)) {
if (core::Property::StringToPermissions(value, permissions)) {
permissions_set = true;
if (context->getProperty(RemoteOwner, value, flow_file)) {
if (core::Property::StringToInt(value, remote_owner)) {
remote_owner_set = true;
if (context->getProperty(RemoteGroup, value, flow_file)) {
if (core::Property::StringToInt(value, remote_group)) {
remote_group_set = true;
/* Reject zero-byte files if needed */
if (reject_zero_byte_ && flow_file->getSize() == 0U) {
logger_->log_debug("Rejecting %s because it is zero bytes", filename);
session->transfer(flow_file, Reject);
return true;
/* Get SFTPClient from cache or create it */
const SFTPProcessorBase::ConnectionCacheKey connection_cache_key = {common_properties.hostname,
auto client = getOrCreateConnection(connection_cache_key,
if (client == nullptr) {
return false;
* Unless we're sure that the connection is good, we don't want to put it back to the cache.
* So we will only call this when we're sure that the connection is OK.
auto put_connection_back_to_cache = [this, &connection_cache_key, &client]() {
addConnectionToCache(connection_cache_key, std::move(client));
/* Try to detect conflicts if needed */
std::string resolved_filename = filename;
if (conflict_resolution_ != CONFLICT_RESOLUTION_NONE) {
std::string target_path = utils::file::FileUtils::concat_path(remote_path, filename, true /*force_posix*/);
if (!client->stat(target_path, true /*follow_symlinks*/, attrs)) {
if (client->getLastError() != utils::SFTPError::SFTP_ERROR_FILE_NOT_EXISTS) {
logger_->log_error("Failed to stat %s", target_path.c_str());
session->transfer(flow_file, Failure);
return true;
} else {
if ((attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS) && LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
logger_->log_error("Rejecting %s because a directory with the same name already exists", filename.c_str());
session->transfer(flow_file, Reject);
return true;
logger_->log_debug("Found file with the same name as the target file: %s", filename.c_str());
if (conflict_resolution_ == CONFLICT_RESOLUTION_IGNORE) {
logger_->log_debug("Routing %s to SUCCESS despite a file with the same name already existing", filename.c_str());
session->transfer(flow_file, Success);
return true;
} else if (conflict_resolution_ == CONFLICT_RESOLUTION_REJECT) {
logger_->log_debug("Routing %s to REJECT because a file with the same name already exists", filename.c_str());
session->transfer(flow_file, Reject);
return true;
} else if (conflict_resolution_ == CONFLICT_RESOLUTION_FAIL) {
logger_->log_debug("Routing %s to FAILURE because a file with the same name already exists", filename.c_str());
session->transfer(flow_file, Failure);
return true;
} else if (conflict_resolution_ == CONFLICT_RESOLUTION_RENAME) {
std::string possible_resolved_filename;
bool unique_name_generated = false;
for (int i = 1; i < 100; i++) {
std::stringstream possible_resolved_filename_ss;
possible_resolved_filename_ss << i << "." << filename;
possible_resolved_filename = possible_resolved_filename_ss.str();
std::string possible_resolved_path = utils::file::FileUtils::concat_path(remote_path, possible_resolved_filename, true /*force_posix*/);
if (!client->stat(possible_resolved_path, true /*follow_symlinks*/, attrs)) {
if (client->getLastError() == utils::SFTPError::SFTP_ERROR_FILE_NOT_EXISTS) {
unique_name_generated = true;
} else {
logger_->log_error("Failed to stat %s", possible_resolved_path.c_str());
session->transfer(flow_file, Failure);
return true;
if (unique_name_generated) {
logger_->log_debug("Resolved %s to %s", filename.c_str(), possible_resolved_filename.c_str());
resolved_filename = std::move(possible_resolved_filename);
} else {
logger_->log_error("Rejecting %s because a unique name could not be determined after 99 attempts", filename.c_str());
session->transfer(flow_file, Reject);
return true;
/* Create remote directory if needed */
if (create_directory_) {
auto res = createDirectoryHierarchy(*client, remote_path, disable_directory_listing);
switch (res) {
case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_OK:
case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_STAT_FAILED:
return false;
case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_A_DIRECTORY:
session->transfer(flow_file, Failure);
return true;
case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_FOUND:
session->transfer(flow_file, Failure);
return true;
logger_->log_error("Unknown createDirectoryHierarchy result: %hhu", static_cast<uint8_t>(res));
return false;
/* Upload file */
std::stringstream target_path_ss;
target_path_ss << remote_path << "/";
if (!IsNullOrEmpty(temp_file_name)) {
target_path_ss << temp_file_name;
} else if (dot_rename_) {
target_path_ss << "." << resolved_filename;
} else {
target_path_ss << resolved_filename;
auto target_path = target_path_ss.str();
std::string final_target_path = utils::file::FileUtils::concat_path(remote_path, resolved_filename, true /*force_posix*/);
logger_->log_debug("The target path is %s, final target path is %s", target_path.c_str(), final_target_path.c_str());
ReadCallback read_callback(target_path.c_str(), *client, conflict_resolution_);
try {
session->read(flow_file, &read_callback);
} catch (const utils::SFTPError&) {
session->transfer(flow_file, Failure);
return true;
/* Move file to its final place */
if (target_path != final_target_path) {
if (!client->rename(target_path, final_target_path, conflict_resolution_ == CONFLICT_RESOLUTION_REPLACE /*overwrite*/)) {
logger_->log_error("Failed to move temporary file %s to final path %s", target_path, final_target_path);
if (!client->removeFile(target_path)) {
logger_->log_error("Failed to remove temporary file %s", target_path.c_str());
session->transfer(flow_file, Failure);
return true;
/* Set file attributes if needed */
if (last_modified_time_set ||
permissions_set ||
remote_owner_set ||
remote_group_set) {
utils::SFTPClient::SFTPAttributes attrs;
attrs.flags = 0U;
if (last_modified_time_set) {
* NiFi doesn't set atime, only mtime, but because they can only be set together,
* if we don't want to modify atime, we first have to get it.
* Therefore setting them both saves an extra protocol round.
attrs.flags |= utils::SFTPClient::SFTP_ATTRIBUTE_MTIME | utils::SFTPClient::SFTP_ATTRIBUTE_ATIME;
attrs.mtime = last_modified_time;
attrs.atime = last_modified_time;
if (permissions_set) {
attrs.flags |= utils::SFTPClient::SFTP_ATTRIBUTE_PERMISSIONS;
attrs.permissions = permissions;
if (remote_owner_set) {
attrs.flags |= utils::SFTPClient::SFTP_ATTRIBUTE_UID;
attrs.uid = remote_owner;
if (remote_group_set) {
attrs.flags |= utils::SFTPClient::SFTP_ATTRIBUTE_GID;
attrs.gid = remote_group;
if (!client->setAttributes(final_target_path, attrs)) {
/* This is not fatal, just log a warning */
logger_->log_warn("Failed to set file attributes for %s", target_path);
session->transfer(flow_file, Success);
return true;
void PutSFTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
const uint64_t limit = batch_size_ > 0 ? batch_size_ : std::numeric_limits<uint64_t>::max();
for (uint64_t i = 0; i < limit; i++) {
if (!this->processOne(context, session)) {
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */