blob: 8bb1bc66c867a9c8e140765d0965e45381fb3737 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PutSFTP.h"
#include <memory>
#include <algorithm>
#include <cctype>
#include <cstdint>
#include <cstring>
#include <iostream>
#include <iterator>
#include <limits>
#include <map>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "utils/ByteArrayCallback.h"
#include "core/FlowFile.h"
#include "core/logging/Logger.h"
#include "core/ProcessContext.h"
#include "core/Relationship.h"
#include "io/DataStream.h"
#include "io/StreamFactory.h"
#include "ResourceClaim.h"
#include "utils/StringUtils.h"
#include "utils/file/FileUtils.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
core::Property PutSFTP::RemotePath(
core::PropertyBuilder::createProperty("Remote Path")->withDescription("The path on the remote system from which to pull or push files")
->isRequired(false)->supportsExpressionLanguage(true)->build());
core::Property PutSFTP::CreateDirectory(
core::PropertyBuilder::createProperty("Create Directory")->withDescription("Specifies whether or not the remote directory should be created if it does not exist.")
->isRequired(true)->withDefaultValue<bool>(false)->build());
core::Property PutSFTP::DisableDirectoryListing(
core::PropertyBuilder::createProperty("Disable Directory Listing")->withDescription("If set to 'true', directory listing is not performed prior to create missing directories. "
"By default, this processor executes a directory listing command to see target directory existence before creating missing directories. "
"However, there are situations that you might need to disable the directory listing such as the following. "
"Directory listing might fail with some permission setups (e.g. chmod 100) on a directory. "
"Also, if any other SFTP client created the directory after this processor performed a listing and before a directory creation request by this processor is finished, "
"then an error is returned because the directory already exists.")
->isRequired(false)->withDefaultValue<bool>(false)->build());
core::Property PutSFTP::BatchSize(
core::PropertyBuilder::createProperty("Batch Size")->withDescription("The maximum number of FlowFiles to send in a single connection")
->isRequired(true)->withDefaultValue<uint64_t>(500)->build());
core::Property PutSFTP::ConflictResolution(
core::PropertyBuilder::createProperty("Conflict Resolution")->withDescription("Determines how to handle the problem of filename collisions")
->isRequired(true)
->withAllowableValues<std::string>({CONFLICT_RESOLUTION_REPLACE,
CONFLICT_RESOLUTION_IGNORE,
CONFLICT_RESOLUTION_RENAME,
CONFLICT_RESOLUTION_REJECT,
CONFLICT_RESOLUTION_FAIL,
CONFLICT_RESOLUTION_NONE})
->withDefaultValue(CONFLICT_RESOLUTION_NONE)->build());
core::Property PutSFTP::RejectZeroByte(
core::PropertyBuilder::createProperty("Reject Zero-Byte Files")->withDescription("Determines whether or not Zero-byte files should be rejected without attempting to transfer")
->isRequired(false)->withDefaultValue<bool>(true)->build());
core::Property PutSFTP::DotRename(
core::PropertyBuilder::createProperty("Dot Rename")->withDescription("If true, then the filename of the sent file is prepended with a \".\" and then renamed back to the original once the file is completely sent. "
"Otherwise, there is no rename. This property is ignored if the Temporary Filename property is set.")
->isRequired(false)->withDefaultValue<bool>(true)->build());
core::Property PutSFTP::TempFilename(
core::PropertyBuilder::createProperty("Temporary Filename")->withDescription("If set, the filename of the sent file will be equal to the value specified during the transfer and after successful completion will be renamed to the original filename. "
"If this value is set, the Dot Rename property is ignored.")
->isRequired(false)->supportsExpressionLanguage(true)->build());
core::Property PutSFTP::LastModifiedTime(
core::PropertyBuilder::createProperty("Last Modified Time")->withDescription("The lastModifiedTime to assign to the file after transferring it. "
"If not set, the lastModifiedTime will not be changed. "
"Format must be yyyy-MM-dd'T'HH:mm:ssZ. "
"You may also use expression language such as ${file.lastModifiedTime}. "
"If the value is invalid, the processor will not be invalid but will fail to change lastModifiedTime of the file.")
->isRequired(false)->supportsExpressionLanguage(true)->build());
core::Property PutSFTP::Permissions(
core::PropertyBuilder::createProperty("Permissions")->withDescription("The permissions to assign to the file after transferring it. "
"Format must be either UNIX rwxrwxrwx with a - in place of denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644). "
"If not set, the permissions will not be changed. "
"You may also use expression language such as ${file.permissions}. "
"If the value is invalid, the processor will not be invalid but will fail to change permissions of the file.")
->isRequired(false)->supportsExpressionLanguage(true)->build());
core::Property PutSFTP::RemoteOwner(
core::PropertyBuilder::createProperty("Remote Owner")->withDescription("Integer value representing the User ID to set on the file after transferring it. "
"If not set, the owner will not be set. You may also use expression language such as ${file.owner}. "
"If the value is invalid, the processor will not be invalid but will fail to change the owner of the file.")
->isRequired(false)->supportsExpressionLanguage(true)->build());
core::Property PutSFTP::RemoteGroup(
core::PropertyBuilder::createProperty("Remote Group")->withDescription("Integer value representing the Group ID to set on the file after transferring it. "
"If not set, the group will not be set. You may also use expression language such as ${file.group}. "
"If the value is invalid, the processor will not be invalid but will fail to change the group of the file.")
->isRequired(false)->supportsExpressionLanguage(true)->build());
core::Property PutSFTP::UseCompression(
core::PropertyBuilder::createProperty("Use Compression")->withDescription("Indicates whether or not ZLIB compression should be used when transferring files")
->isRequired(true)->withDefaultValue<bool>(false)->build());
core::Relationship PutSFTP::Success("success", "FlowFiles that are successfully sent will be routed to success");
core::Relationship PutSFTP::Reject("reject", "FlowFiles that were rejected by the destination system");
core::Relationship PutSFTP::Failure("failure", "FlowFiles that failed to send to the remote system; failure is usually looped back to this processor");
constexpr char const* PutSFTP::CONFLICT_RESOLUTION_REPLACE;
constexpr char const* PutSFTP::CONFLICT_RESOLUTION_IGNORE;
constexpr char const* PutSFTP::CONFLICT_RESOLUTION_RENAME;
constexpr char const* PutSFTP::CONFLICT_RESOLUTION_REJECT;
constexpr char const* PutSFTP::CONFLICT_RESOLUTION_FAIL;
constexpr char const* PutSFTP::CONFLICT_RESOLUTION_NONE;
constexpr char const* PutSFTP::ProcessorName;
void PutSFTP::initialize() {
logger_->log_trace("Initializing PutSFTP");
// Set the supported properties
std::set<core::Property> properties;
addSupportedCommonProperties(properties);
properties.insert(RemotePath);
properties.insert(CreateDirectory);
properties.insert(DisableDirectoryListing);
properties.insert(BatchSize);
properties.insert(ConflictResolution);
properties.insert(RejectZeroByte);
properties.insert(DotRename);
properties.insert(TempFilename);
properties.insert(LastModifiedTime);
properties.insert(Permissions);
properties.insert(RemoteOwner);
properties.insert(RemoteGroup);
properties.insert(UseCompression);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
relationships.insert(Success);
relationships.insert(Reject);
relationships.insert(Failure);
setSupportedRelationships(relationships);
}
PutSFTP::PutSFTP(std::string name, utils::Identifier uuid /*= utils::Identifier()*/)
: SFTPProcessorBase(name, uuid),
create_directory_(false),
batch_size_(0),
reject_zero_byte_(false),
dot_rename_(false) {
logger_ = logging::LoggerFactory<PutSFTP>::getLogger();
}
PutSFTP::~PutSFTP() = default;
void PutSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
parseCommonPropertiesOnSchedule(context);
std::string value;
if (!context->getProperty(CreateDirectory.getName(), value)) {
logger_->log_error("Create Directory attribute is missing or invalid");
} else {
utils::StringUtils::StringToBool(value, create_directory_);
}
if (!context->getProperty(BatchSize.getName(), value)) {
logger_->log_error("Batch Size attribute is missing or invalid");
} else {
core::Property::StringToInt(value, batch_size_);
}
context->getProperty(ConflictResolution.getName(), conflict_resolution_);
if (context->getProperty(RejectZeroByte.getName(), value)) {
utils::StringUtils::StringToBool(value, reject_zero_byte_);
}
if (context->getProperty(DotRename.getName(), value)) {
utils::StringUtils::StringToBool(value, dot_rename_);
}
if (!context->getProperty(UseCompression.getName(), value)) {
logger_->log_error("Use Compression attribute is missing or invalid");
} else {
utils::StringUtils::StringToBool(value, use_compression_);
}
startKeepaliveThreadIfNeeded();
}
PutSFTP::ReadCallback::ReadCallback(const std::string& target_path,
utils::SFTPClient& client,
const std::string& conflict_resolution)
: logger_(logging::LoggerFactory<PutSFTP::ReadCallback>::getLogger())
, write_succeeded_(false)
, target_path_(target_path)
, client_(client)
, conflict_resolution_(conflict_resolution) {
}
PutSFTP::ReadCallback::~ReadCallback() = default;
int64_t PutSFTP::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
if (!client_.putFile(target_path_,
*stream,
conflict_resolution_ == CONFLICT_RESOLUTION_REPLACE /*overwrite*/,
stream->getSize() /*expected_size*/)) {
throw client_.getLastError();
}
return stream->getSize();
}
bool PutSFTP::processOne(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord>(session->get());
if (flow_file == nullptr) {
return false;
}
/* Parse common properties */
SFTPProcessorBase::CommonProperties common_properties;
if (!parseCommonPropertiesOnTrigger(context, flow_file, common_properties)) {
context->yield();
return false;
}
/* Parse processor-specific properties */
std::string filename;
std::string remote_path;
bool disable_directory_listing = false;
std::string temp_file_name;
bool last_modified_time_set = false;
int64_t last_modified_time = 0U;
bool permissions_set = false;
uint32_t permissions = 0U;
bool remote_owner_set = false;
uint64_t remote_owner = 0U;
bool remote_group_set = false;
uint64_t remote_group = 0U;
flow_file->getKeyedAttribute(FILENAME, filename);
std::string value;
context->getProperty(RemotePath, remote_path, flow_file);
/* Remove trailing slashes */
while (remote_path.size() > 1U && remote_path.back() == '/') {
remote_path.resize(remote_path.size() - 1);
}
/* Empty path means current directory, so we change it to '.' */
if (remote_path.empty()) {
remote_path = ".";
}
if (context->getDynamicProperty(DisableDirectoryListing.getName(), value)) {
utils::StringUtils::StringToBool(value, disable_directory_listing);
} else if (context->getProperty(DisableDirectoryListing.getName(), value)) {
utils::StringUtils::StringToBool(value, disable_directory_listing);
}
context->getProperty(TempFilename, temp_file_name, flow_file);
if (context->getProperty(LastModifiedTime, value, flow_file)) {
if (core::Property::StringToDateTime(value, last_modified_time)) {
last_modified_time_set = true;
}
}
if (context->getProperty(Permissions, value, flow_file)) {
if (core::Property::StringToPermissions(value, permissions)) {
permissions_set = true;
}
}
if (context->getProperty(RemoteOwner, value, flow_file)) {
if (core::Property::StringToInt(value, remote_owner)) {
remote_owner_set = true;
}
}
if (context->getProperty(RemoteGroup, value, flow_file)) {
if (core::Property::StringToInt(value, remote_group)) {
remote_group_set = true;
}
}
/* Reject zero-byte files if needed */
if (reject_zero_byte_ && flow_file->getSize() == 0U) {
logger_->log_debug("Rejecting %s because it is zero bytes", filename);
session->transfer(flow_file, Reject);
return true;
}
/* Get SFTPClient from cache or create it */
const SFTPProcessorBase::ConnectionCacheKey connection_cache_key = {common_properties.hostname,
common_properties.port,
common_properties.username,
proxy_type_,
common_properties.proxy_host,
common_properties.proxy_port,
common_properties.proxy_username};
auto client = getOrCreateConnection(connection_cache_key,
common_properties.password,
common_properties.private_key_path,
common_properties.private_key_passphrase,
common_properties.proxy_password);
if (client == nullptr) {
context->yield();
return false;
}
/*
* Unless we're sure that the connection is good, we don't want to put it back to the cache.
* So we will only call this when we're sure that the connection is OK.
*/
auto put_connection_back_to_cache = [this, &connection_cache_key, &client]() {
addConnectionToCache(connection_cache_key, std::move(client));
};
/* Try to detect conflicts if needed */
std::string resolved_filename = filename;
if (conflict_resolution_ != CONFLICT_RESOLUTION_NONE) {
std::string target_path = utils::file::FileUtils::concat_path(remote_path, filename, true /*force_posix*/);
LIBSSH2_SFTP_ATTRIBUTES attrs;
if (!client->stat(target_path, true /*follow_symlinks*/, attrs)) {
if (client->getLastError() != utils::SFTPError::SFTP_ERROR_FILE_NOT_EXISTS) {
logger_->log_error("Failed to stat %s", target_path.c_str());
session->transfer(flow_file, Failure);
return true;
}
} else {
if ((attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS) && LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
logger_->log_error("Rejecting %s because a directory with the same name already exists", filename.c_str());
session->transfer(flow_file, Reject);
put_connection_back_to_cache();
return true;
}
logger_->log_debug("Found file with the same name as the target file: %s", filename.c_str());
if (conflict_resolution_ == CONFLICT_RESOLUTION_IGNORE) {
logger_->log_debug("Routing %s to SUCCESS despite a file with the same name already existing", filename.c_str());
session->transfer(flow_file, Success);
put_connection_back_to_cache();
return true;
} else if (conflict_resolution_ == CONFLICT_RESOLUTION_REJECT) {
logger_->log_debug("Routing %s to REJECT because a file with the same name already exists", filename.c_str());
session->transfer(flow_file, Reject);
put_connection_back_to_cache();
return true;
} else if (conflict_resolution_ == CONFLICT_RESOLUTION_FAIL) {
logger_->log_debug("Routing %s to FAILURE because a file with the same name already exists", filename.c_str());
session->transfer(flow_file, Failure);
put_connection_back_to_cache();
return true;
} else if (conflict_resolution_ == CONFLICT_RESOLUTION_RENAME) {
std::string possible_resolved_filename;
bool unique_name_generated = false;
for (int i = 1; i < 100; i++) {
std::stringstream possible_resolved_filename_ss;
possible_resolved_filename_ss << i << "." << filename;
possible_resolved_filename = possible_resolved_filename_ss.str();
std::string possible_resolved_path = utils::file::FileUtils::concat_path(remote_path, possible_resolved_filename, true /*force_posix*/);
if (!client->stat(possible_resolved_path, true /*follow_symlinks*/, attrs)) {
if (client->getLastError() == utils::SFTPError::SFTP_ERROR_FILE_NOT_EXISTS) {
unique_name_generated = true;
break;
} else {
logger_->log_error("Failed to stat %s", possible_resolved_path.c_str());
session->transfer(flow_file, Failure);
return true;
}
}
}
if (unique_name_generated) {
logger_->log_debug("Resolved %s to %s", filename.c_str(), possible_resolved_filename.c_str());
resolved_filename = std::move(possible_resolved_filename);
} else {
logger_->log_error("Rejecting %s because a unique name could not be determined after 99 attempts", filename.c_str());
session->transfer(flow_file, Reject);
put_connection_back_to_cache();
return true;
}
}
}
}
/* Create remote directory if needed */
if (create_directory_) {
auto res = createDirectoryHierarchy(*client, remote_path, disable_directory_listing);
switch (res) {
case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_OK:
break;
case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_STAT_FAILED:
context->yield();
return false;
case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_A_DIRECTORY:
session->transfer(flow_file, Failure);
put_connection_back_to_cache();
return true;
case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_FOUND:
case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_PERMISSION_DENIED:
session->transfer(flow_file, Failure);
put_connection_back_to_cache();
return true;
default:
logger_->log_error("Unknown createDirectoryHierarchy result: %hhu", static_cast<uint8_t>(res));
context->yield();
return false;
}
}
/* Upload file */
std::stringstream target_path_ss;
target_path_ss << remote_path << "/";
if (!IsNullOrEmpty(temp_file_name)) {
target_path_ss << temp_file_name;
} else if (dot_rename_) {
target_path_ss << "." << resolved_filename;
} else {
target_path_ss << resolved_filename;
}
auto target_path = target_path_ss.str();
std::string final_target_path = utils::file::FileUtils::concat_path(remote_path, resolved_filename, true /*force_posix*/);
logger_->log_debug("The target path is %s, final target path is %s", target_path.c_str(), final_target_path.c_str());
ReadCallback read_callback(target_path.c_str(), *client, conflict_resolution_);
try {
session->read(flow_file, &read_callback);
} catch (const utils::SFTPError&) {
session->transfer(flow_file, Failure);
return true;
}
/* Move file to its final place */
if (target_path != final_target_path) {
if (!client->rename(target_path, final_target_path, conflict_resolution_ == CONFLICT_RESOLUTION_REPLACE /*overwrite*/)) {
logger_->log_error("Failed to move temporary file %s to final path %s", target_path, final_target_path);
if (!client->removeFile(target_path)) {
logger_->log_error("Failed to remove temporary file %s", target_path.c_str());
}
session->transfer(flow_file, Failure);
return true;
}
}
/* Set file attributes if needed */
if (last_modified_time_set ||
permissions_set ||
remote_owner_set ||
remote_group_set) {
utils::SFTPClient::SFTPAttributes attrs;
attrs.flags = 0U;
if (last_modified_time_set) {
/*
* NiFi doesn't set atime, only mtime, but because they can only be set together,
* if we don't want to modify atime, we first have to get it.
* Therefore setting them both saves an extra protocol round.
*/
attrs.flags |= utils::SFTPClient::SFTP_ATTRIBUTE_MTIME | utils::SFTPClient::SFTP_ATTRIBUTE_ATIME;
attrs.mtime = last_modified_time;
attrs.atime = last_modified_time;
}
if (permissions_set) {
attrs.flags |= utils::SFTPClient::SFTP_ATTRIBUTE_PERMISSIONS;
attrs.permissions = permissions;
}
if (remote_owner_set) {
attrs.flags |= utils::SFTPClient::SFTP_ATTRIBUTE_UID;
attrs.uid = remote_owner;
}
if (remote_group_set) {
attrs.flags |= utils::SFTPClient::SFTP_ATTRIBUTE_GID;
attrs.gid = remote_group;
}
if (!client->setAttributes(final_target_path, attrs)) {
/* This is not fatal, just log a warning */
logger_->log_warn("Failed to set file attributes for %s", target_path);
}
}
session->transfer(flow_file, Success);
put_connection_back_to_cache();
return true;
}
void PutSFTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
const uint64_t limit = batch_size_ > 0 ? batch_size_ : std::numeric_limits<uint64_t>::max();
for (uint64_t i = 0; i < limit; i++) {
if (!this->processOne(context, session)) {
return;
}
}
}
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */