| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "FetchSFTP.h" |
| |
| #include <memory> |
| #include <algorithm> |
| #include <cstdint> |
| #include <cstring> |
| #include <iostream> |
| #include <iterator> |
| #include <limits> |
| #include <set> |
| #include <string> |
| #include <utility> |
| |
| #include "utils/ByteArrayCallback.h" |
| #include "core/FlowFile.h" |
| #include "core/logging/Logger.h" |
| #include "core/ProcessContext.h" |
| #include "core/Relationship.h" |
| #include "io/DataStream.h" |
| #include "io/StreamFactory.h" |
| #include "ResourceClaim.h" |
| #include "utils/StringUtils.h" |
| #include "utils/file/FileUtils.h" |
| |
| namespace org { |
| namespace apache { |
| namespace nifi { |
| namespace minifi { |
| namespace processors { |
| |
| core::Property FetchSFTP::RemoteFile( |
| core::PropertyBuilder::createProperty("Remote File")->withDescription("The fully qualified filename on the remote system") |
| ->isRequired(true)->supportsExpressionLanguage(true)->build()); |
| core::Property FetchSFTP::CompletionStrategy( |
| core::PropertyBuilder::createProperty("Completion Strategy")->withDescription("Specifies what to do with the original file on the server once it has been pulled into NiFi. If the Completion Strategy fails, a warning will be logged but the data will still be transferred.") |
| ->isRequired(true) |
| ->withAllowableValues<std::string>({COMPLETION_STRATEGY_NONE, |
| COMPLETION_STRATEGY_MOVE_FILE, |
| COMPLETION_STRATEGY_DELETE_FILE}) |
| ->withDefaultValue(COMPLETION_STRATEGY_NONE)->build()); |
| core::Property FetchSFTP::MoveDestinationDirectory( |
| core::PropertyBuilder::createProperty("Move Destination Directory")->withDescription("The directory on the remote server to move the original file to once it has been ingested into NiFi. " |
| "This property is ignored unless the Completion Strategy is set to 'Move File'. " |
| "The specified directory must already exist on the remote system if 'Create Directory' is disabled, or the rename will fail.") |
| ->isRequired(false)->supportsExpressionLanguage(true)->build()); |
| core::Property FetchSFTP::CreateDirectory( |
| core::PropertyBuilder::createProperty("Create Directory")->withDescription("Specifies whether or not the remote directory should be created if it does not exist.") |
| ->isRequired(true)->withDefaultValue<bool>(false)->build()); |
| core::Property FetchSFTP::DisableDirectoryListing( |
| core::PropertyBuilder::createProperty("Disable Directory Listing")->withDescription("Control how 'Move Destination Directory' is created when 'Completion Strategy' is 'Move File' and 'Create Directory' is enabled. " |
| "If set to 'true', directory listing is not performed prior to create missing directories. " |
| "By default, this processor executes a directory listing command to see target directory existence before creating missing directories. " |
| "However, there are situations that you might need to disable the directory listing such as the following. " |
| "Directory listing might fail with some permission setups (e.g. chmod 100) on a directory. " |
| "Also, if any other SFTP client created the directory after this processor performed a listing and before a directory creation request by this processor is finished, " |
| "then an error is returned because the directory already exists.") |
| ->isRequired(false)->withDefaultValue<bool>(false)->build()); |
| core::Property FetchSFTP::UseCompression( |
| core::PropertyBuilder::createProperty("Use Compression")->withDescription("Indicates whether or not ZLIB compression should be used when transferring files") |
| ->isRequired(true)->withDefaultValue<bool>(false)->build()); |
| |
| core::Relationship FetchSFTP::Success("success", "All FlowFiles that are received are routed to success"); |
| core::Relationship FetchSFTP::CommsFailure("comms.failure", "Any FlowFile that could not be fetched from the remote server due to a communications failure will be transferred to this Relationship."); |
| core::Relationship FetchSFTP::NotFound("not.found", "Any FlowFile for which we receive a 'Not Found' message from the remote server will be transferred to this Relationship."); |
| core::Relationship FetchSFTP::PermissionDenied("permission.denied", "Any FlowFile that could not be fetched from the remote server due to insufficient permissions will be transferred to this Relationship."); |
| |
| void FetchSFTP::initialize() { |
| logger_->log_trace("Initializing FetchSFTP"); |
| |
| // Set the supported properties |
| std::set<core::Property> properties; |
| addSupportedCommonProperties(properties); |
| properties.insert(RemoteFile); |
| properties.insert(CompletionStrategy); |
| properties.insert(MoveDestinationDirectory); |
| properties.insert(CreateDirectory); |
| properties.insert(DisableDirectoryListing); |
| properties.insert(UseCompression); |
| setSupportedProperties(properties); |
| |
| // Set the supported relationships |
| std::set<core::Relationship> relationships; |
| relationships.insert(Success); |
| relationships.insert(CommsFailure); |
| relationships.insert(NotFound); |
| relationships.insert(PermissionDenied); |
| setSupportedRelationships(relationships); |
| } |
| |
| FetchSFTP::FetchSFTP(std::string name, utils::Identifier uuid /*= utils::Identifier()*/) |
| : SFTPProcessorBase(name, uuid), |
| create_directory_(false), |
| disable_directory_listing_(false) { |
| logger_ = logging::LoggerFactory<FetchSFTP>::getLogger(); |
| } |
| |
| FetchSFTP::~FetchSFTP() { |
| } |
| |
| void FetchSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { |
| parseCommonPropertiesOnSchedule(context); |
| |
| std::string value; |
| context->getProperty(CompletionStrategy.getName(), completion_strategy_); |
| if (!context->getProperty(CreateDirectory.getName(), value)) { |
| logger_->log_error("Create Directory attribute is missing or invalid"); |
| } else { |
| utils::StringUtils::StringToBool(value, create_directory_); |
| } |
| if (!context->getProperty(DisableDirectoryListing.getName(), value)) { |
| logger_->log_error("Disable Directory Listing attribute is missing or invalid"); |
| } else { |
| utils::StringUtils::StringToBool(value, disable_directory_listing_); |
| } |
| if (!context->getProperty(UseCompression.getName(), value)) { |
| logger_->log_error("Use Compression attribute is missing or invalid"); |
| } else { |
| utils::StringUtils::StringToBool(value, use_compression_); |
| } |
| |
| startKeepaliveThreadIfNeeded(); |
| } |
| |
| FetchSFTP::WriteCallback::WriteCallback(const std::string& remote_file, |
| utils::SFTPClient& client) |
| : logger_(logging::LoggerFactory<FetchSFTP::WriteCallback>::getLogger()) |
| , remote_file_(remote_file) |
| , client_(client) { |
| } |
| |
| FetchSFTP::WriteCallback::~WriteCallback() { |
| } |
| |
| int64_t FetchSFTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) { |
| if (!client_.getFile(remote_file_, *stream)) { |
| throw client_.getLastError(); |
| } |
| return stream->getSize(); |
| } |
| |
| void FetchSFTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { |
| std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord>(session->get()); |
| if (flow_file == nullptr) { |
| return; |
| } |
| |
| /* Parse common properties */ |
| SFTPProcessorBase::CommonProperties common_properties; |
| if (!parseCommonPropertiesOnTrigger(context, flow_file, common_properties)) { |
| context->yield(); |
| return; |
| } |
| |
| /* Parse processor-specific properties */ |
| std::string remote_file; |
| std::string move_destination_directory; |
| |
| context->getProperty(RemoteFile, remote_file, flow_file); |
| context->getProperty(MoveDestinationDirectory, move_destination_directory, flow_file); |
| |
| /* Get SFTPClient from cache or create it */ |
| const SFTPProcessorBase::ConnectionCacheKey connection_cache_key = {common_properties.hostname, |
| common_properties.port, |
| common_properties.username, |
| proxy_type_, |
| common_properties.proxy_host, |
| common_properties.proxy_port, |
| common_properties.proxy_username}; |
| auto client = getOrCreateConnection(connection_cache_key, |
| common_properties.password, |
| common_properties.private_key_path, |
| common_properties.private_key_passphrase, |
| common_properties.proxy_password); |
| if (client == nullptr) { |
| context->yield(); |
| return; |
| } |
| |
| /* |
| * Unless we're sure that the connection is good, we don't want to put it back to the cache. |
| * So we will only call this when we're sure that the connection is OK. |
| */ |
| auto put_connection_back_to_cache = [this, &connection_cache_key, &client]() { |
| addConnectionToCache(connection_cache_key, std::move(client)); |
| }; |
| |
| /* Download file */ |
| WriteCallback write_callback(remote_file, *client); |
| try { |
| session->write(flow_file, &write_callback); |
| } catch (const utils::SFTPError& error) { |
| switch (error) { |
| case utils::SFTPError::SFTP_ERROR_PERMISSION_DENIED: |
| session->transfer(flow_file, PermissionDenied); |
| put_connection_back_to_cache(); |
| return; |
| case utils::SFTPError::SFTP_ERROR_FILE_NOT_EXISTS: |
| session->transfer(flow_file, NotFound); |
| put_connection_back_to_cache(); |
| return; |
| case utils::SFTPError::SFTP_ERROR_COMMUNICATIONS_FAILURE: |
| case utils::SFTPError::SFTP_ERROR_IO_ERROR: |
| session->transfer(flow_file, CommsFailure); |
| return; |
| default: |
| session->transfer(flow_file, PermissionDenied); |
| return; |
| } |
| } |
| |
| /* Set attributes */ |
| std::string parent_path; |
| std::string child_path; |
| std::tie(parent_path, child_path) = utils::file::FileUtils::split_path(remote_file, true /*force_posix*/); |
| |
| session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_HOST, common_properties.hostname); |
| session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_PORT, std::to_string(common_properties.port)); |
| session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_FILENAME, remote_file); |
| flow_file->updateKeyedAttribute(FILENAME, child_path); |
| if (!parent_path.empty()) { |
| flow_file->updateKeyedAttribute(PATH, parent_path); |
| } |
| |
| /* Execute completion strategy */ |
| if (completion_strategy_ == COMPLETION_STRATEGY_DELETE_FILE) { |
| if (!client->removeFile(remote_file)) { |
| logger_->log_warn("Completion Strategy is Delete File, but failed to delete remote file \"%s\"", remote_file); |
| } |
| } else if (completion_strategy_ == COMPLETION_STRATEGY_MOVE_FILE) { |
| bool should_move = true; |
| if (create_directory_) { |
| auto res = createDirectoryHierarchy(*client, move_destination_directory, disable_directory_listing_); |
| if (res != SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_OK) { |
| should_move = false; |
| } |
| } |
| if (!should_move) { |
| logger_->log_warn("Completion Strategy is Move File, but failed to create Move Destination Directory \"%s\"", move_destination_directory); |
| } else { |
| auto target_path = utils::file::FileUtils::concat_path(move_destination_directory, child_path); |
| if (!client->rename(remote_file, target_path, false /*overwrite*/)) { |
| logger_->log_warn("Completion Strategy is Move File, but failed to move file \"%s\" to \"%s\"", remote_file, target_path); |
| } |
| } |
| } |
| |
| session->transfer(flow_file, Success); |
| put_connection_back_to_cache(); |
| } |
| |
| } /* namespace processors */ |
| } /* namespace minifi */ |
| } /* namespace nifi */ |
| } /* namespace apache */ |
| } /* namespace org */ |