MINIFICPP-911 - Added ListSFTP and FetchSFTP processors
This closes #586.
Signed-off-by: Marc Parisi <phrocker@apache.org>
diff --git a/NOTICE b/NOTICE
index 8984cd0..252860b 100644
--- a/NOTICE
+++ b/NOTICE
@@ -22,6 +22,7 @@
JniProcessSession extends and is based on ProcessSession
JniProcessSessionFactory extends and is based on ProcessSessionFactory
JniProvenanceReporter extends and is based on ProvenanceReporter
+ SFTPTestServer extends and is based on SSHTestServer
This includes derived works from the cURL (MIT/X-style licensed) project (https://github.com/curl/curl):
Copyright (c) 1996 - 2019, Daniel Stenberg, <daniel@haxx.se>, and many contributors, see the THANKS file.
diff --git a/extensions/sftp/CMakeLists.txt b/extensions/sftp/CMakeLists.txt
index 89837aa..2536ee1 100644
--- a/extensions/sftp/CMakeLists.txt
+++ b/extensions/sftp/CMakeLists.txt
@@ -70,6 +70,9 @@
include_directories(${ZLIB_INCLUDE_DIRS})
target_link_libraries(minifi-sftp ${ZLIB_LIBRARIES})
+# Include RapidJSON
+include_directories(thirdparty/rapidjson-1.1.0/include)
+
if (WIN32)
message("${OPENSSL_LIBRARIES}")
set (WIN32_ARCHIVES "")
diff --git a/extensions/sftp/SFTPLoader.h b/extensions/sftp/SFTPLoader.h
index dd36e24..0ab0286 100644
--- a/extensions/sftp/SFTPLoader.h
+++ b/extensions/sftp/SFTPLoader.h
@@ -18,8 +18,10 @@
#ifndef EXTENSION_SFTPLOADER_H
#define EXTENSION_SFTPLOADER_H
-#include "processors/PutSFTP.h"
#include "core/ClassLoader.h"
+#include "processors/PutSFTP.h"
+#include "processors/FetchSFTP.h"
+#include "processors/ListSFTP.h"
class SFTPFactoryInitializer : public core::ObjectFactoryInitializer {
public:
@@ -57,6 +59,10 @@
virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) {
if (utils::StringUtils::equalsIgnoreCase(class_name, "PutSFTP")) {
return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::PutSFTP>());
+ } else if (utils::StringUtils::equalsIgnoreCase(class_name, "FetchSFTP")) {
+ return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::FetchSFTP>());
+ } else if (utils::StringUtils::equalsIgnoreCase(class_name, "ListSFTP")) {
+ return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::ListSFTP>());
} else {
return nullptr;
}
diff --git a/extensions/sftp/client/SFTPClient.cpp b/extensions/sftp/client/SFTPClient.cpp
index 6d4254b..bc27d5a 100644
--- a/extensions/sftp/client/SFTPClient.cpp
+++ b/extensions/sftp/client/SFTPClient.cpp
@@ -65,8 +65,77 @@
}
}
+static SFTPError libssh2_sftp_error_to_sftp_error(unsigned long libssh2_sftp_error) {
+ switch (libssh2_sftp_error) {
+ case LIBSSH2_FX_OK:
+ return SFTPError::SFTP_ERROR_OK;
+ case LIBSSH2_FX_NO_SUCH_FILE:
+ case LIBSSH2_FX_NO_SUCH_PATH:
+ return SFTPError::SFTP_ERROR_FILE_NOT_EXISTS;
+ case LIBSSH2_FX_FILE_ALREADY_EXISTS:
+ return SFTPError::SFTP_ERROR_FILE_ALREADY_EXISTS;
+ case LIBSSH2_FX_PERMISSION_DENIED:
+ case LIBSSH2_FX_WRITE_PROTECT:
+ case LIBSSH2_FX_LOCK_CONFLICT:
+ return SFTPError::SFTP_ERROR_PERMISSION_DENIED;
+ case LIBSSH2_FX_NO_CONNECTION:
+ case LIBSSH2_FX_CONNECTION_LOST:
+ return SFTPError::SFTP_ERROR_COMMUNICATIONS_FAILURE;
+ case LIBSSH2_FX_EOF:
+ case LIBSSH2_FX_FAILURE:
+ case LIBSSH2_FX_BAD_MESSAGE:
+ case LIBSSH2_FX_OP_UNSUPPORTED:
+ case LIBSSH2_FX_INVALID_HANDLE:
+ case LIBSSH2_FX_NO_MEDIA:
+ case LIBSSH2_FX_NO_SPACE_ON_FILESYSTEM:
+ case LIBSSH2_FX_QUOTA_EXCEEDED:
+ case LIBSSH2_FX_UNKNOWN_PRINCIPAL:
+ case LIBSSH2_FX_DIR_NOT_EMPTY:
+ case LIBSSH2_FX_NOT_A_DIRECTORY:
+ case LIBSSH2_FX_INVALID_FILENAME:
+ case LIBSSH2_FX_LINK_LOOP:
+ default:
+ return SFTPError::SFTP_ERROR_UNEXPECTED;
+ }
+}
+
constexpr size_t SFTPClient::MAX_BUFFER_SIZE;
+LastSFTPError::LastSFTPError()
+ : sftp_error_set_(false)
+ , libssh2_sftp_error_(LIBSSH2_FX_OK)
+ , sftp_error_(SFTPError::SFTP_ERROR_OK) {
+}
+
+LastSFTPError& LastSFTPError::operator=(unsigned long libssh2_sftp_error) {
+ sftp_error_set_ = false;
+ libssh2_sftp_error_ = libssh2_sftp_error;
+ return *this;
+}
+
+LastSFTPError& LastSFTPError::operator=(const SFTPError& sftp_error) {
+ sftp_error_set_ = true;
+ sftp_error_ = sftp_error;
+ return *this;
+}
+
+LastSFTPError::operator unsigned long() const {
+ if (sftp_error_set_) {
+ return LIBSSH2_FX_OK;
+ } else {
+ return libssh2_sftp_error_;
+ }
+}
+
+LastSFTPError::operator SFTPError() const {
+ if (sftp_error_set_) {
+ return sftp_error_;
+ } else {
+ return libssh2_sftp_error_to_sftp_error(libssh2_sftp_error_);
+ }
+}
+
+
SFTPClient::SFTPClient(const std::string &hostname, uint16_t port, const std::string& username)
: logger_(logging::LoggerFactory<SFTPClient>::getLogger()),
hostname_(hostname),
@@ -82,7 +151,8 @@
easy_(nullptr),
ssh_session_(nullptr),
sftp_session_(nullptr),
- connected_(false) {
+ connected_(false),
+ last_error_() {
easy_ = curl_easy_init();
if (easy_ == nullptr) {
throw std::runtime_error("Cannot create curl easy handle");
@@ -403,10 +473,39 @@
return true;
}
+SFTPError SFTPClient::getLastError() const {
+ return last_error_;
+}
+
bool SFTPClient::getFile(const std::string& path, io::BaseStream& output, int64_t expected_size /*= -1*/) {
- LIBSSH2_SFTP_HANDLE *file_handle = libssh2_sftp_open(sftp_session_, path.c_str(), LIBSSH2_FXF_READ, 0);
+ /**
+ * SFTP servers should not set the mode of an existing file on open
+ * (see https://tools.ietf.org/html/draft-ietf-secsh-filexfer-13, Page 33
+ * "The 'attrs' field is ignored if an existing file is opened."
+ * Unfortunately this is a later SFTP version specification than implemented by most servers.)
+ * But because this is the intuitively correct behaviour (especially when opening a file for read only),
+ * most servers (OpenSSH for example) implement it this way.
+ * mina-sshd, the server we use for testing, however did not until recently,
+ * causing all files we read to be set to 0000.
+ * The fix to make it behave correctly has been merged back to master, but not yet released:
+ * https://github.com/apache/mina-sshd/commit/19adb39e4706929b6e5a1b2df056a2b2a29fac4d
+ * If we encounter real servers that behave like this, a workaround would be to stat before opening the file
+ * and "re-setting" the mode we read earlier on open.
+ * An another option would be to patch libssh2 to not send permissions in attrs when opening a file for read only.
+ */
+ LIBSSH2_SFTP_HANDLE *file_handle = libssh2_sftp_open(sftp_session_, path.c_str(), LIBSSH2_FXF_READ, 0 /*mode*/);
if (file_handle == nullptr) {
- logger_->log_error("Failed to open remote file \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+ int ssh_errno = libssh2_session_last_errno(ssh_session_);
+ /* We can only get the sftp error in this case if the ssh error is a protocol error */
+ if (ssh_errno == LIBSSH2_ERROR_SFTP_PROTOCOL) {
+ last_error_ = libssh2_sftp_last_error(sftp_session_);
+ logger_->log_error("Failed to open remote file \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
+ } else {
+ last_error_ = SFTPError::SFTP_ERROR_IO_ERROR;
+ char *err_msg = nullptr;
+ libssh2_session_last_error(ssh_session_, &err_msg, nullptr, 0);
+ logger_->log_error("Failed to open remote file \"%s\" due to an underlying SSH error: %s", path.c_str(), err_msg);
+ }
return false;
}
utils::ScopeGuard guard([&file_handle]() {
@@ -419,7 +518,8 @@
do {
ssize_t read_ret = libssh2_sftp_read(file_handle, reinterpret_cast<char*>(buf.data()), buf.size());
if (read_ret < 0) {
- logger_->log_error("Failed to read remote file \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+ last_error_ = SFTPError::SFTP_ERROR_IO_ERROR;
+ logger_->log_error("Failed to read remote file \"%s\"", path.c_str());
return false;
} else if (read_ret == 0) {
logger_->log_trace("EOF while reading remote file \"%s\"", path.c_str());
@@ -429,8 +529,9 @@
total_read += read_ret;
int remaining = read_ret;
while (remaining > 0) {
- int write_ret = output.writeData(buf.data() + (buf.size() - remaining), remaining);
+ int write_ret = output.writeData(buf.data() + (read_ret - remaining), remaining);
if (write_ret < 0) {
+ last_error_ = LIBSSH2_FX_OK;
logger_->log_error("Failed to write output");
return false;
}
@@ -439,6 +540,7 @@
} while (true);
if (expected_size >= 0 && total_read != expected_size) {
+ last_error_ = LIBSSH2_FX_OK;
logger_->log_error("Remote file \"%s\" has unexpected size, expected: %ld, actual: %lu", path.c_str(), expected_size, total_read);
return false;
}
@@ -451,8 +553,17 @@
logger_->log_trace("Opening remote file \"%s\"", path.c_str());
LIBSSH2_SFTP_HANDLE *file_handle = libssh2_sftp_open(sftp_session_, path.c_str(), flags, 0644);
if (file_handle == nullptr) {
- logger_->log_error("Failed to open remote file \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
- return false;
+ int ssh_errno = libssh2_session_last_errno(ssh_session_);
+ /* We can only get the sftp error in this case if the ssh error is a protocol error */
+ if (ssh_errno == LIBSSH2_ERROR_SFTP_PROTOCOL) {
+ last_error_ = libssh2_sftp_last_error(sftp_session_);
+ logger_->log_error("Failed to open remote file \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
+ } else {
+ last_error_ = SFTPError::SFTP_ERROR_IO_ERROR;
+ char *err_msg = nullptr;
+ libssh2_session_last_error(ssh_session_, &err_msg, nullptr, 0);
+ logger_->log_error("Failed to open remote file \"%s\" due to an underlying SSH error: %s", path.c_str(), err_msg);
+ }
}
utils::ScopeGuard guard([this, &file_handle, &path]() {
logger_->log_trace("Closing remote file \"%s\"", path.c_str());
@@ -470,8 +581,7 @@
do {
int read_ret = input.readData(buf.data(), buf.size());
if (read_ret < 0) {
- char *err_msg = nullptr;
- libssh2_session_last_error(ssh_session_, &err_msg, nullptr, 0);
+ last_error_ = LIBSSH2_FX_OK;
logger_->log_error("Error while reading input");
return false;
} else if (read_ret == 0) {
@@ -484,7 +594,8 @@
while (remaining > 0) {
int write_ret = libssh2_sftp_write(file_handle, reinterpret_cast<char*>(buf.data() + (read_ret - remaining)), remaining);
if (write_ret < 0) {
- logger_->log_error("Failed to write remote file \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+ last_error_ = SFTPError::SFTP_ERROR_IO_ERROR;
+ logger_->log_error("Failed to write remote file \"%s\"", path.c_str());
return false;
}
logger_->log_trace("Wrote %d bytes to remote file \"%s\"", write_ret, path.c_str());
@@ -493,6 +604,7 @@
} while (true);
if (expected_size >= 0 && total_read != expected_size) {
+ last_error_ = LIBSSH2_FX_OK;
logger_->log_error("Input has unexpected size, expected: %ld, actual: %lu", path.c_str(), expected_size, total_read);
return false;
}
@@ -524,10 +636,11 @@
}
continue;
}
+ last_error_ = libssh2_sftp_last_error(sftp_session_);
logger_->log_error("Failed to rename remote file \"%s\" to \"%s\", error: %s",
source_path.c_str(),
target_path.c_str(),
- sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+ sftp_strerror(last_error_));
return false;
}
return true;
@@ -535,6 +648,7 @@
bool SFTPClient::createDirectoryHierarchy(const std::string& path) {
if (path.empty()) {
+ last_error_ = LIBSSH2_FX_OK;
return false;
}
bool absolute = path[0] == '/';
@@ -552,7 +666,8 @@
if (err != LIBSSH2_FX_FILE_ALREADY_EXISTS &&
err != LIBSSH2_FX_FAILURE &&
err != LIBSSH2_FX_PERMISSION_DENIED) {
- logger_->log_error("Failed to create remote directory \"%s\", error: %s", current_dir.c_str(), sftp_strerror(err));
+ last_error_ = err;
+ logger_->log_error("Failed to create remote directory \"%s\", error: %s", current_dir.c_str(), sftp_strerror(last_error_));
return false;
} else {
logger_->log_debug("Non-fatal failure to create remote directory \"%s\", error: %s", current_dir.c_str(), sftp_strerror(err));
@@ -564,7 +679,8 @@
bool SFTPClient::removeFile(const std::string& path) {
if (libssh2_sftp_unlink(sftp_session_, path.c_str()) != 0) {
- logger_->log_error("Failed to remove remote file \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+ last_error_ = libssh2_sftp_last_error(sftp_session_);
+ logger_->log_error("Failed to remove remote file \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
return false;
}
return true;
@@ -572,7 +688,8 @@
bool SFTPClient::removeDirectory(const std::string& path) {
if (libssh2_sftp_rmdir(sftp_session_, path.c_str()) != 0) {
- logger_->log_error("Failed to remove remote directory \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+ last_error_ = libssh2_sftp_last_error(sftp_session_);
+ logger_->log_error("Failed to remove remote directory \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
return false;
}
return true;
@@ -587,7 +704,8 @@
0 /* mode */,
LIBSSH2_SFTP_OPENDIR);
if (dir_handle == nullptr) {
- logger_->log_error("Failed to open remote directory \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+ last_error_ = libssh2_sftp_last_error(sftp_session_);
+ logger_->log_error("Failed to open remote directory \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
return false;
}
utils::ScopeGuard guard([&dir_handle]() {
@@ -605,16 +723,17 @@
longentry.size(),
&attrs);
if (ret < 0) {
- logger_->log_error("Failed to read remote directory \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+ last_error_ = libssh2_sftp_last_error(sftp_session_);
+ logger_->log_error("Failed to read remote directory \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
return false;
} else if (ret == 0) {
break;
}
if (follow_symlinks && attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS && LIBSSH2_SFTP_S_ISLNK(attrs.permissions)) {
+ std::stringstream new_path;
+ new_path << path << "/" << filename.data();
auto orig_attrs = attrs;
- bool file_not_exists;
- if (!this->stat(path, true /*follow_symlinks*/, attrs, file_not_exists)) {
- logger_->log_debug("Failed to stat directory child \"%s/%s\", error: %s", path.c_str(), filename.data(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+ if (!this->stat(new_path.str(), true /*follow_symlinks*/, attrs)) {
attrs = orig_attrs;
}
}
@@ -623,18 +742,14 @@
return true;
}
-bool SFTPClient::stat(const std::string& path, bool follow_symlinks, LIBSSH2_SFTP_ATTRIBUTES& result, bool& file_not_exists) {
- file_not_exists = false;
+bool SFTPClient::stat(const std::string& path, bool follow_symlinks, LIBSSH2_SFTP_ATTRIBUTES& result) {
if (libssh2_sftp_stat_ex(sftp_session_,
path.c_str(),
path.length(),
follow_symlinks ? LIBSSH2_SFTP_STAT : LIBSSH2_SFTP_LSTAT,
&result) != 0) {
- auto error = libssh2_sftp_last_error(sftp_session_);
- if (error == LIBSSH2_FX_NO_SUCH_FILE) {
- file_not_exists = true;
- }
- logger_->log_debug("Failed to stat remote path \"%s\", error: %s", path.c_str(), sftp_strerror(error));
+ last_error_ = libssh2_sftp_last_error(sftp_session_);
+ logger_->log_debug("Failed to stat remote path \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
return false;
}
return true;
@@ -646,8 +761,7 @@
if ((!!(input.flags & SFTP_ATTRIBUTE_UID) != !!(input.flags & SFTP_ATTRIBUTE_GID)) ||
(!!(input.flags & SFTP_ATTRIBUTE_MTIME) != !!(input.flags & SFTP_ATTRIBUTE_ATIME))) {
/* Because we can only set these attributes in pairs, we must stat first to learn the other */
- bool file_not_exists;
- if (!this->stat(path, false /*follow_symlinks*/, attrs, file_not_exists)) {
+ if (!this->stat(path, false /*follow_symlinks*/, attrs)) {
return false;
}
}
@@ -678,7 +792,8 @@
path.length(),
LIBSSH2_SFTP_SETSTAT,
&attrs) != 0) {
- logger_->log_debug("Failed to setstat on remote path \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+ last_error_ = libssh2_sftp_last_error(sftp_session_);
+ logger_->log_debug("Failed to setstat on remote path \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
return false;
}
diff --git a/extensions/sftp/client/SFTPClient.h b/extensions/sftp/client/SFTPClient.h
index 4d6ce49..8e6c234 100644
--- a/extensions/sftp/client/SFTPClient.h
+++ b/extensions/sftp/client/SFTPClient.h
@@ -40,6 +40,36 @@
namespace minifi {
namespace utils {
+enum class SFTPError : uint8_t {
+ SFTP_ERROR_OK = 0,
+ SFTP_ERROR_PERMISSION_DENIED,
+ SFTP_ERROR_FILE_NOT_EXISTS,
+ SFTP_ERROR_FILE_ALREADY_EXISTS,
+ SFTP_ERROR_COMMUNICATIONS_FAILURE,
+ SFTP_ERROR_IO_ERROR,
+ SFTP_ERROR_UNEXPECTED
+};
+
+class LastSFTPError {
+ public:
+ LastSFTPError();
+
+ LastSFTPError(const LastSFTPError&) = delete;
+ LastSFTPError(LastSFTPError&&) = delete;
+ LastSFTPError& operator=(const LastSFTPError&) = delete;
+ LastSFTPError& operator=(LastSFTPError&&) = delete;
+
+ LastSFTPError& operator=(unsigned long libssh2_sftp_error);
+ LastSFTPError& operator=(const SFTPError& sftp_error);
+ operator unsigned long() const;
+ operator SFTPError() const;
+
+ private:
+ bool sftp_error_set_;
+ unsigned long libssh2_sftp_error_;
+ SFTPError sftp_error_;
+};
+
class SFTPClient {
public:
@@ -77,6 +107,13 @@
bool sendKeepAliveIfNeeded(int &seconds_to_next);
+ /**
+ * If any function below this returns false, this function provides the last SFTP-related error.
+ * If a function did not fail because of an SFTP-related error, this function will return SFTP_ERROR_OK.
+ * If this function is called after a function returns true, the return value is UNDEFINED.
+ */
+ SFTPError getLastError() const;
+
bool getFile(const std::string& path, io::BaseStream& output, int64_t expected_size = -1);
bool putFile(const std::string& path, io::BaseStream& input, bool overwrite, int64_t expected_size = -1);
@@ -92,7 +129,7 @@
bool listDirectory(const std::string& path, bool follow_symlinks,
std::vector<std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>>& children_result);
- bool stat(const std::string& path, bool follow_symlinks, LIBSSH2_SFTP_ATTRIBUTES& result, bool& file_not_exists);
+ bool stat(const std::string& path, bool follow_symlinks, LIBSSH2_SFTP_ATTRIBUTES& result);
static const uint32_t SFTP_ATTRIBUTE_PERMISSIONS = 0x00000001;
static const uint32_t SFTP_ATTRIBUTE_UID = 0x00000002;
@@ -148,6 +185,7 @@
bool connected_;
+ LastSFTPError last_error_;
};
} /* namespace utils */
diff --git a/extensions/sftp/processors/FetchSFTP.cpp b/extensions/sftp/processors/FetchSFTP.cpp
new file mode 100644
index 0000000..3bc156c
--- /dev/null
+++ b/extensions/sftp/processors/FetchSFTP.cpp
@@ -0,0 +1,273 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FetchSFTP.h"
+
+#include <memory>
+#include <algorithm>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <iterator>
+#include <limits>
+#include <set>
+#include <string>
+#include <utility>
+
+#include "utils/ByteArrayCallback.h"
+#include "core/FlowFile.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/Relationship.h"
+#include "io/DataStream.h"
+#include "io/StreamFactory.h"
+#include "ResourceClaim.h"
+#include "utils/StringUtils.h"
+#include "utils/file/FileUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property FetchSFTP::RemoteFile(
+ core::PropertyBuilder::createProperty("Remote File")->withDescription("The fully qualified filename on the remote system")
+ ->isRequired(true)->supportsExpressionLanguage(true)->build());
+core::Property FetchSFTP::CompletionStrategy(
+ core::PropertyBuilder::createProperty("Completion Strategy")->withDescription("Specifies what to do with the original file on the server once it has been pulled into NiFi. If the Completion Strategy fails, a warning will be logged but the data will still be transferred.")
+ ->isRequired(true)
+ ->withAllowableValues<std::string>({COMPLETION_STRATEGY_NONE,
+ COMPLETION_STRATEGY_MOVE_FILE,
+ COMPLETION_STRATEGY_DELETE_FILE})
+ ->withDefaultValue(COMPLETION_STRATEGY_NONE)->build());
+core::Property FetchSFTP::MoveDestinationDirectory(
+ core::PropertyBuilder::createProperty("Move Destination Directory")->withDescription("The directory on the remote server to move the original file to once it has been ingested into NiFi. "
+ "This property is ignored unless the Completion Strategy is set to 'Move File'. "
+ "The specified directory must already exist on the remote system if 'Create Directory' is disabled, or the rename will fail.")
+ ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property FetchSFTP::CreateDirectory(
+ core::PropertyBuilder::createProperty("Create Directory")->withDescription("Specifies whether or not the remote directory should be created if it does not exist.")
+ ->isRequired(true)->withDefaultValue<bool>(false)->build());
+core::Property FetchSFTP::DisableDirectoryListing(
+ core::PropertyBuilder::createProperty("Disable Directory Listing")->withDescription("Control how 'Move Destination Directory' is created when 'Completion Strategy' is 'Move File' and 'Create Directory' is enabled. "
+ "If set to 'true', directory listing is not performed prior to create missing directories. "
+ "By default, this processor executes a directory listing command to see target directory existence before creating missing directories. "
+ "However, there are situations that you might need to disable the directory listing such as the following. "
+ "Directory listing might fail with some permission setups (e.g. chmod 100) on a directory. "
+ "Also, if any other SFTP client created the directory after this processor performed a listing and before a directory creation request by this processor is finished, "
+ "then an error is returned because the directory already exists.")
+ ->isRequired(false)->withDefaultValue<bool>(false)->build());
+core::Property FetchSFTP::UseCompression(
+ core::PropertyBuilder::createProperty("Use Compression")->withDescription("Indicates whether or not ZLIB compression should be used when transferring files")
+ ->isRequired(true)->withDefaultValue<bool>(false)->build());
+
+core::Relationship FetchSFTP::Success("success", "All FlowFiles that are received are routed to success");
+core::Relationship FetchSFTP::CommsFailure("comms.failure", "Any FlowFile that could not be fetched from the remote server due to a communications failure will be transferred to this Relationship.");
+core::Relationship FetchSFTP::NotFound("not.found", "Any FlowFile for which we receive a 'Not Found' message from the remote server will be transferred to this Relationship.");
+core::Relationship FetchSFTP::PermissionDenied("permission.denied", "Any FlowFile that could not be fetched from the remote server due to insufficient permissions will be transferred to this Relationship.");
+
+void FetchSFTP::initialize() {
+ logger_->log_trace("Initializing FetchSFTP");
+
+ // Set the supported properties
+ std::set<core::Property> properties;
+ addSupportedCommonProperties(properties);
+ properties.insert(RemoteFile);
+ properties.insert(CompletionStrategy);
+ properties.insert(MoveDestinationDirectory);
+ properties.insert(CreateDirectory);
+ properties.insert(DisableDirectoryListing);
+ properties.insert(UseCompression);
+ setSupportedProperties(properties);
+
+ // Set the supported relationships
+ std::set<core::Relationship> relationships;
+ relationships.insert(Success);
+ relationships.insert(CommsFailure);
+ relationships.insert(NotFound);
+ relationships.insert(PermissionDenied);
+ setSupportedRelationships(relationships);
+}
+
+FetchSFTP::FetchSFTP(std::string name, utils::Identifier uuid /*= utils::Identifier()*/)
+ : SFTPProcessorBase(name, uuid),
+ create_directory_(false),
+ disable_directory_listing_(false) {
+ logger_ = logging::LoggerFactory<FetchSFTP>::getLogger();
+}
+
+FetchSFTP::~FetchSFTP() {
+}
+
+void FetchSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+ parseCommonPropertiesOnSchedule(context);
+
+ std::string value;
+ context->getProperty(CompletionStrategy.getName(), completion_strategy_);
+ if (!context->getProperty(CreateDirectory.getName(), value)) {
+ logger_->log_error("Create Directory attribute is missing or invalid");
+ } else {
+ utils::StringUtils::StringToBool(value, create_directory_);
+ }
+ if (!context->getProperty(DisableDirectoryListing.getName(), value)) {
+ logger_->log_error("Disable Directory Listing attribute is missing or invalid");
+ } else {
+ utils::StringUtils::StringToBool(value, disable_directory_listing_);
+ }
+ if (!context->getProperty(UseCompression.getName(), value)) {
+ logger_->log_error("Use Compression attribute is missing or invalid");
+ } else {
+ utils::StringUtils::StringToBool(value, use_compression_);
+ }
+
+ startKeepaliveThreadIfNeeded();
+}
+
+FetchSFTP::WriteCallback::WriteCallback(const std::string& remote_file,
+ utils::SFTPClient& client)
+ : logger_(logging::LoggerFactory<FetchSFTP::WriteCallback>::getLogger())
+ , remote_file_(remote_file)
+ , client_(client) {
+}
+
+FetchSFTP::WriteCallback::~WriteCallback() {
+}
+
+int64_t FetchSFTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+ if (!client_.getFile(remote_file_, *stream)) {
+ throw client_.getLastError();
+ }
+ return stream->getSize();
+}
+
+void FetchSFTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+ std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord>(session->get());
+ if (flow_file == nullptr) {
+ return;
+ }
+
+ /* Parse common properties */
+ SFTPProcessorBase::CommonProperties common_properties;
+ if (!parseCommonPropertiesOnTrigger(context, flow_file, common_properties)) {
+ context->yield();
+ return;
+ }
+
+ /* Parse processor-specific properties */
+ std::string remote_file;
+ std::string move_destination_directory;
+
+ context->getProperty(RemoteFile, remote_file, flow_file);
+ context->getProperty(MoveDestinationDirectory, move_destination_directory, flow_file);
+
+ /* Get SFTPClient from cache or create it */
+ const SFTPProcessorBase::ConnectionCacheKey connection_cache_key = {common_properties.hostname,
+ common_properties.port,
+ common_properties.username,
+ proxy_type_,
+ common_properties.proxy_host,
+ common_properties.proxy_port,
+ common_properties.proxy_username};
+ auto client = getOrCreateConnection(connection_cache_key,
+ common_properties.password,
+ common_properties.private_key_path,
+ common_properties.private_key_passphrase,
+ common_properties.proxy_password);
+ if (client == nullptr) {
+ context->yield();
+ return;
+ }
+
+ /*
+ * Unless we're sure that the connection is good, we don't want to put it back to the cache.
+ * So we will only call this when we're sure that the connection is OK.
+ */
+ auto put_connection_back_to_cache = [this, &connection_cache_key, &client]() {
+ addConnectionToCache(connection_cache_key, std::move(client));
+ };
+
+ /* Download file */
+ WriteCallback write_callback(remote_file, *client);
+ try {
+ session->write(flow_file, &write_callback);
+ } catch (const utils::SFTPError& error) {
+ switch (error) {
+ case utils::SFTPError::SFTP_ERROR_PERMISSION_DENIED:
+ session->transfer(flow_file, PermissionDenied);
+ put_connection_back_to_cache();
+ return;
+ case utils::SFTPError::SFTP_ERROR_FILE_NOT_EXISTS:
+ session->transfer(flow_file, NotFound);
+ put_connection_back_to_cache();
+ return;
+ case utils::SFTPError::SFTP_ERROR_COMMUNICATIONS_FAILURE:
+ case utils::SFTPError::SFTP_ERROR_IO_ERROR:
+ session->transfer(flow_file, CommsFailure);
+ return;
+ default:
+ session->transfer(flow_file, PermissionDenied);
+ return;
+ }
+ }
+
+ /* Set attributes */
+ std::string parent_path;
+ std::string child_path;
+ std::tie(parent_path, child_path) = utils::file::FileUtils::split_path(remote_file, true /*force_posix*/);
+
+ session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_HOST, common_properties.hostname);
+ session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_PORT, std::to_string(common_properties.port));
+ session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_FILENAME, remote_file);
+ flow_file->updateKeyedAttribute(FILENAME, child_path);
+ if (!parent_path.empty()) {
+ flow_file->updateKeyedAttribute(PATH, parent_path);
+ }
+
+ /* Execute completion strategy */
+ if (completion_strategy_ == COMPLETION_STRATEGY_DELETE_FILE) {
+ if (!client->removeFile(remote_file)) {
+ logger_->log_warn("Completion Strategy is Delete File, but failed to delete remote file \"%s\"", remote_file);
+ }
+ } else if (completion_strategy_ == COMPLETION_STRATEGY_MOVE_FILE) {
+ bool should_move = true;
+ if (create_directory_) {
+ auto res = createDirectoryHierarchy(*client, move_destination_directory, disable_directory_listing_);
+ if (res != SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_OK) {
+ should_move = false;
+ }
+ }
+ if (!should_move) {
+ logger_->log_warn("Completion Strategy is Move File, but failed to create Move Destination Directory \"%s\"", move_destination_directory);
+ } else {
+ auto target_path = utils::file::FileUtils::concat_path(move_destination_directory, child_path);
+ if (!client->rename(remote_file, target_path, false /*overwrite*/)) {
+ logger_->log_warn("Completion Strategy is Move File, but failed to move file \"%s\" to \"%s\"", remote_file, target_path);
+ }
+ }
+ }
+
+ session->transfer(flow_file, Success);
+ put_connection_back_to_cache();
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sftp/processors/FetchSFTP.h b/extensions/sftp/processors/FetchSFTP.h
new file mode 100644
index 0000000..749b12e
--- /dev/null
+++ b/extensions/sftp/processors/FetchSFTP.h
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __FETCH_SFTP_H__
+#define __FETCH_SFTP_H__
+
+#include <memory>
+#include <string>
+
+#include "SFTPProcessorBase.h"
+#include "utils/ByteArrayCallback.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+#include "../client/SFTPClient.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class FetchSFTP : public SFTPProcessorBase {
+ public:
+
+ static constexpr char const *COMPLETION_STRATEGY_NONE = "None";
+ static constexpr char const *COMPLETION_STRATEGY_MOVE_FILE = "Move File";
+ static constexpr char const *COMPLETION_STRATEGY_DELETE_FILE = "Delete File";
+
+ static constexpr char const* ProcessorName = "FetchSFTP";
+
+
+ /*!
+ * Create a new processor
+ */
+ FetchSFTP(std::string name, utils::Identifier uuid = utils::Identifier());
+ virtual ~FetchSFTP();
+
+ // Supported Properties
+ static core::Property RemoteFile;
+ static core::Property CompletionStrategy;
+ static core::Property MoveDestinationDirectory;
+ static core::Property CreateDirectory;
+ static core::Property DisableDirectoryListing;
+ static core::Property UseCompression;
+
+ // Supported Relationships
+ static core::Relationship Success;
+ static core::Relationship CommsFailure;
+ static core::Relationship NotFound;
+ static core::Relationship PermissionDenied;
+
+ // Writes Attributes
+ static constexpr char const* ATTRIBUTE_SFTP_REMOTE_HOST = "sftp.remote.host";
+ static constexpr char const* ATTRIBUTE_SFTP_REMOTE_PORT= "sftp.remote.port";
+ static constexpr char const* ATTRIBUTE_SFTP_REMOTE_FILENAME = "sftp.remote.filename";
+
+ virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+ virtual void initialize() override;
+ virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+
+ class WriteCallback : public OutputStreamCallback {
+ public:
+ WriteCallback(const std::string& remote_file,
+ utils::SFTPClient& client);
+ ~WriteCallback();
+ virtual int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+
+ private:
+ std::shared_ptr<logging::Logger> logger_;
+ const std::string remote_file_;
+ utils::SFTPClient& client_;
+ };
+
+ private:
+
+ std::string completion_strategy_;
+ bool create_directory_;
+ bool disable_directory_listing_;
+};
+
+REGISTER_RESOURCE(FetchSFTP, "Fetches the content of a file from a remote SFTP server and overwrites the contents of an incoming FlowFile with the content of the remote file.")
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
diff --git a/extensions/sftp/processors/ListSFTP.cpp b/extensions/sftp/processors/ListSFTP.cpp
new file mode 100644
index 0000000..e4b56be
--- /dev/null
+++ b/extensions/sftp/processors/ListSFTP.cpp
@@ -0,0 +1,1173 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListSFTP.h"
+
+#include <memory>
+#include <algorithm>
+#include <cctype>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <iterator>
+#include <limits>
+#include <map>
+#include <set>
+#include <list>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "utils/ByteArrayCallback.h"
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "utils/ScopeGuard.h"
+#include "utils/file/FileUtils.h"
+#include "core/FlowFile.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/Relationship.h"
+#include "io/DataStream.h"
+#include "io/StreamFactory.h"
+#include "ResourceClaim.h"
+
+#include "rapidjson/document.h"
+#include "rapidjson/ostreamwrapper.h"
+#include "rapidjson/istreamwrapper.h"
+#include "rapidjson/writer.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property ListSFTP::ListingStrategy(
+ core::PropertyBuilder::createProperty("Listing Strategy")->withDescription("Specify how to determine new/updated entities. See each strategy descriptions for detail.")
+ ->isRequired(true)
+ ->withAllowableValues<std::string>({LISTING_STRATEGY_TRACKING_TIMESTAMPS,
+ LISTING_STRATEGY_TRACKING_ENTITIES})
+ ->withDefaultValue(LISTING_STRATEGY_TRACKING_TIMESTAMPS)->build());
+core::Property ListSFTP::RemotePath(
+ core::PropertyBuilder::createProperty("Remote Path")->withDescription("The fully qualified filename on the remote system")
+ ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property ListSFTP::SearchRecursively(
+ core::PropertyBuilder::createProperty("Search Recursively")->withDescription("If true, will pull files from arbitrarily nested subdirectories; "
+ "otherwise, will not traverse subdirectories")
+ ->isRequired(true)->withDefaultValue<bool>(false)->build());
+core::Property ListSFTP::FollowSymlink(
+ core::PropertyBuilder::createProperty("Follow symlink")->withDescription("If true, will pull even symbolic files and also nested symbolic subdirectories; "
+ "otherwise, will not read symbolic files and will not traverse symbolic link subdirectories")
+ ->isRequired(true)->withDefaultValue<bool>(false)->build());
+core::Property ListSFTP::FileFilterRegex(
+ core::PropertyBuilder::createProperty("File Filter Regex")->withDescription("Provides a Java Regular Expression for filtering Filenames; "
+ "if a filter is supplied, only files whose names match that Regular Expression will be fetched")
+ ->isRequired(false)->build());
+core::Property ListSFTP::PathFilterRegex(
+ core::PropertyBuilder::createProperty("Path Filter Regex")->withDescription("When Search Recursively is true, then only subdirectories whose path matches the given Regular Expression will be scanned")
+ ->isRequired(false)->build());
+core::Property ListSFTP::IgnoreDottedFiles(
+ core::PropertyBuilder::createProperty("Ignore Dotted Files")->withDescription("If true, files whose names begin with a dot (\".\") will be ignored")
+ ->isRequired(true)->withDefaultValue<bool>(true)->build());
+core::Property ListSFTP::TargetSystemTimestampPrecision(
+ core::PropertyBuilder::createProperty("Target System Timestamp Precision")->withDescription("Specify timestamp precision at the target system. "
+ "Since this processor uses timestamp of entities to decide which should be listed, "
+ "it is crucial to use the right timestamp precision.")
+ ->isRequired(true)
+ ->withAllowableValues<std::string>({TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT,
+ TARGET_SYSTEM_TIMESTAMP_PRECISION_MILLISECONDS,
+ TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS,
+ TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES})
+ ->withDefaultValue(TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT)->build());
+core::Property ListSFTP::EntityTrackingTimeWindow(
+ core::PropertyBuilder::createProperty("Entity Tracking Time Window")->withDescription("Specify how long this processor should track already-listed entities. "
+ "'Tracking Entities' strategy can pick any entity whose timestamp is inside the specified time window. "
+ "For example, if set to '30 minutes', any entity having timestamp in recent 30 minutes will be the listing target when this processor runs. "
+ "A listed entity is considered 'new/updated' and a FlowFile is emitted if one of following condition meets: "
+ "1. does not exist in the already-listed entities, "
+ "2. has newer timestamp than the cached entity, "
+ "3. has different size than the cached entity. "
+ "If a cached entity's timestamp becomes older than specified time window, that entity will be removed from the cached already-listed entities. "
+ "Used by 'Tracking Entities' strategy.")
+ ->isRequired(false)->build());
+core::Property ListSFTP::EntityTrackingInitialListingTarget(
+ core::PropertyBuilder::createProperty("Entity Tracking Initial Listing Target")->withDescription("Specify how initial listing should be handled. Used by 'Tracking Entities' strategy.")
+ ->withAllowableValues<std::string>({ENTITY_TRACKING_INITIAL_LISTING_TARGET_TRACKING_TIME_WINDOW,
+ ENTITY_TRACKING_INITIAL_LISTING_TARGET_ALL_AVAILABLE})
+ ->isRequired(false)->withDefaultValue(ENTITY_TRACKING_INITIAL_LISTING_TARGET_ALL_AVAILABLE)->build());
+core::Property ListSFTP::MinimumFileAge(
+ core::PropertyBuilder::createProperty("Minimum File Age")->withDescription("The minimum age that a file must be in order to be pulled; "
+ "any file younger than this amount of time (according to last modification date) will be ignored")
+ ->isRequired(true)->withDefaultValue<core::TimePeriodValue>("0 sec")->build());
+core::Property ListSFTP::MaximumFileAge(
+ core::PropertyBuilder::createProperty("Maximum File Age")->withDescription("The maximum age that a file must be in order to be pulled; "
+ "any file older than this amount of time (according to last modification date) will be ignored")
+ ->isRequired(false)->build());
+core::Property ListSFTP::MinimumFileSize(
+ core::PropertyBuilder::createProperty("Minimum File Size")->withDescription("The minimum size that a file must be in order to be pulled")
+ ->isRequired(true)->withDefaultValue<core::DataSizeValue>("0 B")->build());
+core::Property ListSFTP::MaximumFileSize(
+ core::PropertyBuilder::createProperty("Maximum File Size")->withDescription("The maximum size that a file must be in order to be pulled")
+ ->isRequired(false)->build());
+core::Property ListSFTP::StateFile(
+ core::PropertyBuilder::createProperty("State File")->withDescription("Specifies the file that should be used for storing state about"
+ " what data has been ingested so that upon restart MiNiFi can resume from where it left off")
+ ->isRequired(true)->withDefaultValue("ListSFTP")->build());
+
+core::Relationship ListSFTP::Success("success", "All FlowFiles that are received are routed to success");
+
+const std::map<std::string, uint64_t> ListSFTP::LISTING_LAG_MAP = {
+ {ListSFTP::TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS, 1000},
+ {ListSFTP::TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES, 60000},
+};
+
+void ListSFTP::initialize() {
+ logger_->log_trace("Initializing FetchSFTP");
+
+ // Set the supported properties
+ std::set<core::Property> properties;
+ addSupportedCommonProperties(properties);
+ properties.insert(ListingStrategy);
+ properties.insert(RemotePath);
+ properties.insert(SearchRecursively);
+ properties.insert(FollowSymlink);
+ properties.insert(FileFilterRegex);
+ properties.insert(PathFilterRegex);
+ properties.insert(IgnoreDottedFiles);
+ properties.insert(TargetSystemTimestampPrecision);
+ properties.insert(EntityTrackingTimeWindow);
+ properties.insert(EntityTrackingInitialListingTarget);
+ properties.insert(MinimumFileAge);
+ properties.insert(MaximumFileAge);
+ properties.insert(MinimumFileSize);
+ properties.insert(MaximumFileSize);
+ properties.insert(StateFile);
+ setSupportedProperties(properties);
+
+ // Set the supported relationships
+ std::set<core::Relationship> relationships;
+ relationships.insert(Success);
+ setSupportedRelationships(relationships);
+}
+
+ListSFTP::ListSFTP(std::string name, utils::Identifier uuid /*= utils::Identifier()*/)
+ : SFTPProcessorBase(name, uuid)
+ , search_recursively_(false)
+ , follow_symlink_(false)
+ , file_filter_regex_set_(false)
+ , path_filter_regex_set_(false)
+ , ignore_dotted_files_(false)
+ , minimum_file_age_(0U)
+ , maximum_file_age_(0U)
+ , minimum_file_size_(0U)
+ , maximum_file_size_(0U)
+ , already_loaded_from_cache_(false)
+ , last_listed_latest_entry_timestamp_(0U)
+ , last_processed_latest_entry_timestamp_(0U)
+ , initial_listing_complete_(false) {
+ logger_ = logging::LoggerFactory<ListSFTP>::getLogger();
+}
+
+ListSFTP::~ListSFTP() {
+#ifndef WIN32
+ if (file_filter_regex_set_) {
+ regfree(&compiled_file_filter_regex_);
+ }
+ if (path_filter_regex_set_) {
+ regfree(&compiled_path_filter_regex_);
+ }
+#endif
+}
+
+void ListSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+ parseCommonPropertiesOnSchedule(context);
+
+ std::string value;
+ context->getProperty(ListingStrategy.getName(), listing_strategy_);
+ if (!last_listing_strategy_.empty() && last_listing_strategy_ != listing_strategy_) {
+ invalidateCache();
+ }
+ last_listing_strategy_ = listing_strategy_;
+ if (!context->getProperty(SearchRecursively.getName(), value)) {
+ logger_->log_error("Search Recursively attribute is missing or invalid");
+ } else {
+ utils::StringUtils::StringToBool(value, search_recursively_);
+ }
+ if (!context->getProperty(FollowSymlink.getName(), value)) {
+ logger_->log_error("Follow symlink attribute is missing or invalid");
+ } else {
+ utils::StringUtils::StringToBool(value, follow_symlink_);
+ }
+ if (context->getProperty(FileFilterRegex.getName(), file_filter_regex_)) {
+#ifndef WIN32
+ if (file_filter_regex_set_) {
+ regfree(&compiled_file_filter_regex_);
+ }
+ int ret = regcomp(&compiled_file_filter_regex_, file_filter_regex_.c_str(), 0);
+ if (ret != 0) {
+ logger_->log_error("Failed to compile File Filter Regex \"%s\"", file_filter_regex_.c_str());
+ file_filter_regex_set_ = false;
+ } else {
+ file_filter_regex_set_ = true;
+ }
+#else
+ try {
+ compiled_file_filter_regex_ = std::regex(file_filter_regex_);
+ file_filter_regex_set_ = true;
+ } catch (std::regex_error&) {
+ logger_->log_error("Failed to compile File Filter Regex \"%s\"", file_filter_regex_.c_str());
+ file_filter_regex_set_ = false;
+ }
+#endif
+ } else {
+ file_filter_regex_set_ = false;
+ }
+ if (context->getProperty(PathFilterRegex.getName(), path_filter_regex_)) {
+#ifndef WIN32
+ if (path_filter_regex_set_) {
+ regfree(&compiled_path_filter_regex_);
+ }
+ int ret = regcomp(&compiled_path_filter_regex_, path_filter_regex_.c_str(), 0);
+ if (ret != 0) {
+ logger_->log_error("Failed to compile Path Filter Regex \"%s\"", path_filter_regex_.c_str());
+ file_filter_regex_set_ = false;
+ } else {
+ path_filter_regex_set_ = true;
+ }
+#else
+ try {
+ compiled_path_filter_regex_ = std::regex(path_filter_regex_);
+ path_filter_regex_set_ = true;
+ } catch (std::regex_error&) {
+ logger_->log_error("Failed to compile Path Filter Regex \"%s\"", path_filter_regex_.c_str());
+ path_filter_regex_set_ = false;
+ }
+#endif
+ } else {
+ path_filter_regex_set_ = false;
+ }
+ if (!context->getProperty(IgnoreDottedFiles.getName(), value)) {
+ logger_->log_error("Ignore Dotted Files attribute is missing or invalid");
+ } else {
+ utils::StringUtils::StringToBool(value, ignore_dotted_files_);
+ }
+ context->getProperty(TargetSystemTimestampPrecision.getName(), target_system_timestamp_precision_);
+ context->getProperty(EntityTrackingInitialListingTarget.getName(), entity_tracking_initial_listing_target_);
+ if (!context->getProperty(MinimumFileAge.getName(), value)) {
+ logger_->log_error("Minimum File Age attribute is missing or invalid");
+ } else {
+ core::TimeUnit unit;
+ if (!core::Property::StringToTime(value, minimum_file_age_, unit) || !core::Property::ConvertTimeUnitToMS(minimum_file_age_, unit, minimum_file_age_)) {
+ logger_->log_error("Minimum File Age attribute is invalid");
+ }
+ }
+ if (context->getProperty(MaximumFileAge.getName(), value)) {
+ core::TimeUnit unit;
+ if (!core::Property::StringToTime(value, maximum_file_age_, unit) || !core::Property::ConvertTimeUnitToMS(maximum_file_age_, unit, maximum_file_age_)) {
+ logger_->log_error("Maximum File Age attribute is invalid");
+ }
+ }
+ if (!context->getProperty(MinimumFileSize.getName(), minimum_file_size_)) {
+ logger_->log_error("Minimum File Size attribute is invalid");
+ }
+ if (context->getProperty(MaximumFileSize.getName(), value)) {
+ if (!core::DataSizeValue::StringToInt(value, maximum_file_size_)) {
+ logger_->log_error("Maximum File Size attribute is invalid");
+ }
+ }
+ context->getProperty(StateFile.getName(), value);
+ if (listing_strategy_ == LISTING_STRATEGY_TRACKING_TIMESTAMPS) {
+ std::stringstream ss;
+ ss << value << "." << getUUIDStr() << ".TrackingTimestamps";
+ auto new_tracking_timestamps_state_filename = ss.str();
+ if (new_tracking_timestamps_state_filename != tracking_timestamps_state_filename_) {
+ if (!tracking_timestamps_state_filename_.empty()) {
+ if (unlink(tracking_timestamps_state_filename_.c_str()) != 0) {
+ logger_->log_error("Unable to delete old Tracking Timestamps state file \"%s\"",
+ tracking_timestamps_state_filename_.c_str());
+ }
+ }
+ }
+ tracking_timestamps_state_filename_ = new_tracking_timestamps_state_filename;
+ } else if (listing_strategy_ == LISTING_STRATEGY_TRACKING_ENTITIES) {
+ std::stringstream ss;
+ ss << value << "." << getUUIDStr() << ".TrackingEntities";
+ auto new_tracking_entities_state_filename = ss.str();
+ ss << ".json";
+ auto new_tracking_entities_state_json_filename = ss.str();
+ if (new_tracking_entities_state_filename != tracking_entities_state_filename_) {
+ if (!tracking_entities_state_filename_.empty()) {
+ if (unlink(tracking_entities_state_filename_.c_str()) != 0) {
+ logger_->log_error("Unable to delete old Tracking Entities state file \"%s\"",
+ tracking_entities_state_filename_.c_str());
+ }
+ }
+ if (!tracking_entities_state_json_filename_.empty()) {
+ if (unlink(tracking_entities_state_json_filename_.c_str()) != 0) {
+ logger_->log_error("Unable to delete old Tracking Entities json state file \"%s\"",
+ tracking_entities_state_json_filename_.c_str());
+ }
+ }
+ }
+ tracking_entities_state_filename_ = new_tracking_entities_state_filename;
+ tracking_entities_state_json_filename_ = new_tracking_entities_state_json_filename;
+ } else {
+ logger_->log_error("Unknown Listing Strategy: \"%s\"", listing_strategy_.c_str());
+ }
+
+ startKeepaliveThreadIfNeeded();
+}
+
+void ListSFTP::invalidateCache() {
+ logger_->log_warn("Important properties have been reconfigured, invalidating in-memory cache");
+
+ already_loaded_from_cache_ = false;
+
+ last_run_time_ = std::chrono::time_point<std::chrono::steady_clock>();
+ last_listed_latest_entry_timestamp_ = 0U;
+ last_processed_latest_entry_timestamp_ = 0U;
+ latest_identifiers_processed_.clear();
+
+ initial_listing_complete_ = false;
+ already_listed_entities_.clear();
+}
+
+ListSFTP::Child::Child()
+ :directory(false) {
+ memset(&attrs, 0x00, sizeof(attrs));
+}
+
+ListSFTP::Child::Child(const std::string& parent_path_, std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>&& sftp_child) {
+ parent_path = parent_path_;
+ std::tie(filename, std::ignore, attrs) = std::move(sftp_child);
+ directory = LIBSSH2_SFTP_S_ISDIR(attrs.permissions);
+}
+
+std::string ListSFTP::Child::getPath() const {
+ return utils::file::FileUtils::concat_path(parent_path, filename, true /*force_posix*/);
+}
+
+bool ListSFTP::filter(const std::string& parent_path, const std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>& sftp_child) {
+ const std::string& filename = std::get<0>(sftp_child);
+ const LIBSSH2_SFTP_ATTRIBUTES& attrs = std::get<2>(sftp_child);
+ /* This should not happen */
+ if (filename.empty()) {
+ logger_->log_error("Listing directory \"%s\" returned an empty child", parent_path.c_str());
+ return false;
+ }
+ /* Ignore current dir and parent dir */
+ if (filename == "." || filename == "..") {
+ return false;
+ }
+ /* Dotted files */
+ if (ignore_dotted_files_ && filename[0] == '.') {
+ logger_->log_debug("Ignoring \"%s/%s\" because Ignore Dotted Files is true", parent_path.c_str(), filename.c_str());
+ return false;
+ }
+ if (!(attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS)) {
+ // TODO: maybe do a fallback stat here
+ logger_->log_error("Failed to get permissions in stat for \"%s/%s\"", parent_path.c_str(), filename.c_str());
+ return false;
+ }
+ if (LIBSSH2_SFTP_S_ISREG(attrs.permissions)) {
+ return filterFile(parent_path, filename, attrs);
+ } else if (LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
+ return filterDirectory(parent_path, filename, attrs);
+ } else {
+ logger_->log_debug("Skipping non-regular, non-directory file \"%s/%s\"", parent_path.c_str(), filename.c_str());
+ return false;
+ }
+}
+
+bool ListSFTP::filterFile(const std::string& parent_path, const std::string& filename, const LIBSSH2_SFTP_ATTRIBUTES& attrs) {
+ if (!(attrs.flags & LIBSSH2_SFTP_ATTR_UIDGID) ||
+ !(attrs.flags & LIBSSH2_SFTP_ATTR_SIZE) ||
+ !(attrs.flags & LIBSSH2_SFTP_ATTR_ACMODTIME)) {
+ // TODO: maybe do a fallback stat here
+ logger_->log_error("Failed to get all attributes in stat for \"%s/%s\"", parent_path.c_str(), filename.c_str());
+ return false;
+ }
+
+ /* Age */
+ time_t now = time(nullptr);
+ int64_t file_age = (now - attrs.mtime) * 1000;
+ if (file_age < minimum_file_age_) {
+ logger_->log_debug("Ignoring \"%s/%s\" because it is younger than the Minimum File Age: %ld ms < %lu ms",
+ parent_path.c_str(),
+ filename.c_str(),
+ file_age,
+ minimum_file_age_);
+ return false;
+ }
+ if (maximum_file_age_ != 0U && file_age > maximum_file_age_) {
+ logger_->log_debug("Ignoring \"%s/%s\" because it is older than the Maximum File Age: %ld ms > %lu ms",
+ parent_path.c_str(),
+ filename.c_str(),
+ file_age,
+ maximum_file_age_);
+ return false;
+ }
+
+ /* Size */
+ if (attrs.filesize < minimum_file_size_) {
+ logger_->log_debug("Ignoring \"%s/%s\" because it is smaller than the Minimum File Size: %lu B < %lu B",
+ parent_path.c_str(),
+ filename.c_str(),
+ attrs.filesize,
+ minimum_file_size_);
+ return false;
+ }
+ if (maximum_file_size_ != 0U && attrs.filesize > maximum_file_size_) {
+ logger_->log_debug("Ignoring \"%s/%s\" because it is larger than the Maximum File Size: %lu B > %lu B",
+ parent_path.c_str(),
+ filename.c_str(),
+ attrs.filesize,
+ maximum_file_size_);
+ return false;
+ }
+
+ /* File Filter Regex */
+ if (file_filter_regex_set_) {
+ bool match = false;
+#ifndef WIN32
+ int ret = regexec(&compiled_file_filter_regex_, filename.c_str(), static_cast<size_t>(0), nullptr, 0);
+ match = ret == 0;
+#else
+ match = std::regex_match(filename, compiled_file_filter_regex_);
+#endif
+ if (!match) {
+ logger_->log_debug("Ignoring \"%s/%s\" because it did not match the File Filter Regex \"%s\"",
+ parent_path.c_str(),
+ filename.c_str(),
+ file_filter_regex_);
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool ListSFTP::filterDirectory(const std::string& parent_path, const std::string& filename, const LIBSSH2_SFTP_ATTRIBUTES& attrs) {
+ if (!search_recursively_) {
+ return false;
+ }
+
+ /* Path Filter Regex */
+ if (path_filter_regex_set_) {
+ std::string dir_path = utils::file::FileUtils::concat_path(parent_path, filename, true /*force_posix*/);
+ bool match = false;
+#ifndef WIN32
+ int ret = regexec(&compiled_path_filter_regex_, dir_path.c_str(), static_cast<size_t>(0), nullptr, 0);
+ match = ret == 0;
+#else
+ match = std::regex_match(dir_path, compiled_path_filter_regex_);
+#endif
+ if (!match) {
+ logger_->log_debug("Not recursing into \"%s\" because it did not match the Path Filter Regex \"%s\"",
+ dir_path.c_str(),
+ path_filter_regex_);
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool ListSFTP::createAndTransferFlowFileFromChild(
+ const std::shared_ptr<core::ProcessSession>& session,
+ const std::string& hostname,
+ uint16_t port,
+ const std::string& username,
+ const ListSFTP::Child& child) {
+ /* Convert mtime to string */
+ if (child.attrs.mtime > std::numeric_limits<int64_t>::max()) {
+ logger_->log_error("Modification date %lu of \"%s/%s\" larger than int64_t max", child.attrs.mtime, child.parent_path.c_str(), child.filename.c_str());
+ return true;
+ }
+ std::string mtime_str;
+ if (!getDateTimeStr(static_cast<int64_t>(child.attrs.mtime), mtime_str)) {
+ logger_->log_error("Failed to convert modification date %lu of \"%s/%s\" to string", child.attrs.mtime, child.parent_path.c_str(), child.filename.c_str());
+ return true;
+ }
+
+ /* Create FlowFile */
+ std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord>(session->create());
+ if (flow_file == nullptr) {
+ logger_->log_error("Failed to create FlowFileRecord");
+ return false;
+ }
+
+ /* Set attributes */
+ session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_HOST, hostname);
+ session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_PORT, std::to_string(port));
+ session->putAttribute(flow_file, ATTRIBUTE_SFTP_LISTING_USER, username);
+
+ /* uid and gid */
+ session->putAttribute(flow_file, ATTRIBUTE_FILE_OWNER, std::to_string(child.attrs.uid));
+ session->putAttribute(flow_file, ATTRIBUTE_FILE_GROUP, std::to_string(child.attrs.gid));
+
+ /* permissions */
+ std::stringstream ss;
+ ss << std::setfill('0') << std::setw(4) << std::oct << (child.attrs.permissions & 0777);
+ session->putAttribute(flow_file, ATTRIBUTE_FILE_PERMISSIONS, ss.str());
+
+ /* filesize */
+ session->putAttribute(flow_file, ATTRIBUTE_FILE_SIZE, std::to_string(child.attrs.filesize));
+
+ /* mtime */
+ session->putAttribute(flow_file, ATTRIBUTE_FILE_LASTMODIFIEDTIME, mtime_str);
+
+ flow_file->updateKeyedAttribute(FILENAME, child.filename);
+ flow_file->updateKeyedAttribute(PATH, child.parent_path);
+
+ session->transfer(flow_file, Success);
+
+ return true;
+}
+
+ListSFTP::ListedEntity::ListedEntity()
+ : timestamp(0U)
+ , size(0U) {
+}
+
+ListSFTP::ListedEntity::ListedEntity(uint64_t timestamp_, uint64_t size_)
+ : timestamp(timestamp_)
+ , size(size_) {
+}
+
+bool ListSFTP::persistTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path) {
+ std::ofstream file(tracking_timestamps_state_filename_);
+ if (!file.is_open()) {
+ logger_->log_error("Failed to store state to Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+ return false;
+ }
+ file << "hostname=" << hostname << "\n";
+ file << "username=" << username << "\n";
+ file << "remote_path=" << remote_path << "\n";
+ file << "listing.timestamp=" << last_listed_latest_entry_timestamp_ << "\n";
+ file << "processed.timestamp=" << last_processed_latest_entry_timestamp_ << "\n";
+ size_t i = 0;
+ for (const auto& identifier : latest_identifiers_processed_) {
+ file << "id." << i << "=" << identifier << "\n";
+ ++i;
+ }
+ return true;
+}
+
+bool ListSFTP::updateFromTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path) {
+ std::ifstream file(tracking_timestamps_state_filename_);
+ if (!file.is_open()) {
+ logger_->log_error("Failed to open Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+ return false;
+ }
+ std::string state_hostname;
+ std::string state_username;
+ std::string state_remote_path;
+ uint64_t state_listing_timestamp;
+ uint64_t state_processed_timestamp;
+ std::set<std::string> state_ids;
+
+ std::string line;
+ while (std::getline(file, line)) {
+ size_t separator_pos = line.find('=');
+ if (separator_pos == std::string::npos) {
+ logger_->log_warn("None key-value line found in Tracking Timestamps state file \"%s\": \"%s\"", tracking_timestamps_state_filename_.c_str(), line.c_str());
+ }
+ std::string key = line.substr(0, separator_pos);
+ std::string value = line.substr(separator_pos + 1);
+ if (key == "hostname") {
+ state_hostname = std::move(value);
+ } else if (key == "username") {
+ state_username = std::move(value);
+ } else if (key == "remote_path") {
+ state_remote_path = std::move(value);
+ } else if (key == "listing.timestamp") {
+ try {
+ state_listing_timestamp = stoull(value);
+ } catch (...) {
+ logger_->log_error("listing.timestamp is not an uint64 in Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+ return false;
+ }
+ } else if (key == "processed.timestamp") {
+ try {
+ state_processed_timestamp = stoull(value);
+ } catch (...) {
+ logger_->log_error("processed.timestamp is not an uint64 in Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+ return false;
+ }
+ } else if (key.compare(0, strlen("id."), "id.") == 0) {
+ state_ids.emplace(std::move(value));
+ } else {
+ logger_->log_warn("Unknown key found in Tracking Timestamps state file \"%s\": \"%s\"", tracking_timestamps_state_filename_.c_str(), key.c_str());
+ }
+ }
+ file.close();
+
+ if (state_hostname != hostname ||
+ state_username != username ||
+ state_remote_path != remote_path) {
+ logger_->log_error("Tracking Timestamps state file \"%s\" was created with different settings than the current ones, ignoring. "
+ "Hostname: \"%s\" vs. \"%s\", "
+ "Username: \"%s\" vs. \"%s\", "
+ "Remote Path: \"%s\" vs. \"%s\"",
+ tracking_timestamps_state_filename_.c_str(),
+ state_hostname, hostname,
+ state_username, username,
+ state_remote_path, remote_path);
+ return false;
+ }
+
+ last_listed_latest_entry_timestamp_ = state_listing_timestamp;
+ last_processed_latest_entry_timestamp_ = state_processed_timestamp;
+ latest_identifiers_processed_ = std::move(state_ids);
+
+ return true;
+}
+
+void ListSFTP::listByTrackingTimestamps(
+ const std::shared_ptr<core::ProcessContext>& context,
+ const std::shared_ptr<core::ProcessSession>& session,
+ const std::string& hostname,
+ uint16_t port,
+ const std::string& username,
+ const std::string& remote_path,
+ std::vector<Child>&& files) {
+ uint64_t min_timestamp_to_list = last_listed_latest_entry_timestamp_;
+
+ /* Load state from cache file if needed */
+ if (!already_loaded_from_cache_ && !tracking_timestamps_state_filename_.empty()) {
+ if (updateFromTrackingTimestampsCache(hostname, username, remote_path)) {
+ logger_->log_debug("Successfully loaded Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+ } else {
+ logger_->log_debug("Failed to load Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+ }
+ already_loaded_from_cache_ = true;
+ }
+
+ std::chrono::time_point<std::chrono::steady_clock> current_run_time = std::chrono::steady_clock::now();
+ time_t now = time(nullptr);
+
+ /* Order children by timestamp and try to detect timestamp precision if needed */
+ std::map<uint64_t /*timestamp*/, std::list<Child>> ordered_files;
+ bool target_system_has_seconds = false;
+ for (auto&& file : files) {
+ uint64_t timestamp = file.attrs.mtime * 1000;
+ target_system_has_seconds |= timestamp % 60000 != 0;
+
+ bool new_file = min_timestamp_to_list == 0U || (timestamp >= min_timestamp_to_list && timestamp >= last_processed_latest_entry_timestamp_);
+ if (new_file) {
+ auto& files_for_timestamp = ordered_files[timestamp];
+ files_for_timestamp.emplace_back(std::move(file));
+ } else {
+ logger_->log_trace("Skipping \"%s\", because it is not new.", file.getPath().c_str());
+ }
+ }
+
+ uint64_t latest_listed_entry_timestamp_this_cycle = 0U;
+ size_t flow_files_created = 0U;
+ if (ordered_files.size() > 0) {
+ latest_listed_entry_timestamp_this_cycle = ordered_files.crbegin()->first;
+
+ std::string remote_system_timestamp_precision;
+ if (target_system_timestamp_precision_ == TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT) {
+ if (target_system_has_seconds) {
+ logger_->log_debug("Precision auto detection detected second precision");
+ remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS;
+ } else {
+ logger_->log_debug("Precision auto detection detected minute precision");
+ remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES;
+ }
+ } else if (target_system_timestamp_precision_ == TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES) {
+ remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES;
+ } else {
+ /*
+ * We only have seconds-precision timestamps, TARGET_SYSTEM_TIMESTAMP_PRECISION_MILLISECONDS makes no real sense here,
+ * so we will treat it as TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS.
+ */
+ remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS;
+ }
+ uint64_t listing_lag = LISTING_LAG_MAP.at(remote_system_timestamp_precision);
+ logger_->log_debug("The listing lag is %lu ms", listing_lag);
+
+ /* If the latest listing time is equal to the last listing time, there are no entries with a newer timestamp than previously seen */
+ if (latest_listed_entry_timestamp_this_cycle == last_listed_latest_entry_timestamp_) {
+ const auto& latest_files = ordered_files.at(latest_listed_entry_timestamp_this_cycle);
+ uint64_t elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(current_run_time - last_run_time_).count();
+ /* If a precision-specific listing lag has not yet elapsed since out last execution, we wait. */
+ if (elapsed_time < listing_lag) {
+ logger_->log_debug("The latest listed entry timestamp is the same as the last listed entry timestamp (%lu) "
+ "and the listing lag has not yet elapsed (%lu ms < % lu ms). Yielding.",
+ latest_listed_entry_timestamp_this_cycle,
+ elapsed_time,
+ listing_lag);
+ context->yield();
+ return;
+ }
+ /*
+ * If we have already processed the entities with the newest timestamp,
+ * and there are no new entities with that timestamp, there is nothing to do.
+ */
+ if (latest_listed_entry_timestamp_this_cycle == last_processed_latest_entry_timestamp_ &&
+ std::all_of(latest_files.begin(), latest_files.end(), [this](const Child& child) {
+ return latest_identifiers_processed_.count(child.getPath()) == 1U;
+ })) {
+ logger_->log_debug("The latest listed entry timestamp is the same as the last listed entry timestamp (%lu) "
+ "and all files for that timestamp has been processed. Yielding.", latest_listed_entry_timestamp_this_cycle);
+ context->yield();
+ return;
+ }
+ } else {
+ /* Determine the minimum reliable timestamp based on precision */
+ uint64_t minimum_reliable_timestamp = now * 1000 - listing_lag;
+ if (remote_system_timestamp_precision == TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS) {
+ minimum_reliable_timestamp -= minimum_reliable_timestamp % 1000;
+ } else {
+ minimum_reliable_timestamp -= minimum_reliable_timestamp % 60000;
+ }
+ /* If the latest timestamp is not old enough, we wait another cycle */
+ if (minimum_reliable_timestamp < latest_listed_entry_timestamp_this_cycle) {
+ logger_->log_debug("Skipping files with latest timestamp because their modification date is not smaller than the minimum reliable timestamp: %lu ms >= %lu ms",
+ latest_listed_entry_timestamp_this_cycle,
+ minimum_reliable_timestamp);
+ ordered_files.erase(latest_listed_entry_timestamp_this_cycle);
+ }
+ }
+
+ for (auto& files_for_timestamp : ordered_files) {
+ if (files_for_timestamp.first == last_processed_latest_entry_timestamp_) {
+ /* Filter out previously processed entities. */
+ for (auto it = files_for_timestamp.second.begin(); it != files_for_timestamp.second.end();) {
+ if (latest_identifiers_processed_.count(it->getPath()) != 0U) {
+ it = files_for_timestamp.second.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ }
+ for (const auto& file : files_for_timestamp.second) {
+ /* Create the FlowFile for this path */
+ if (createAndTransferFlowFileFromChild(session, hostname, port, username, file)) {
+ flow_files_created++;
+ } else {
+ logger_->log_error("Failed to emit FlowFile for \"%s\"", file.filename);
+ context->yield();
+ return;
+ }
+ }
+ }
+ }
+
+ /* If we have a listing timestamp, it is worth persisting the state */
+ if (latest_listed_entry_timestamp_this_cycle != 0U) {
+ bool processed_new_files = flow_files_created > 0U;
+ if (processed_new_files) {
+ auto last_files_it = ordered_files.crbegin();
+ if (last_files_it->first != last_processed_latest_entry_timestamp_) {
+ latest_identifiers_processed_.clear();
+ }
+
+ for (const auto& last_file : last_files_it->second) {
+ latest_identifiers_processed_.insert(last_file.getPath());
+ }
+
+ last_processed_latest_entry_timestamp_ = last_files_it->first;
+ }
+
+ last_run_time_ = current_run_time;
+
+ if (latest_listed_entry_timestamp_this_cycle != last_listed_latest_entry_timestamp_ || processed_new_files) {
+ last_listed_latest_entry_timestamp_ = latest_listed_entry_timestamp_this_cycle;
+ if (!tracking_timestamps_state_filename_.empty()) {
+ persistTrackingTimestampsCache(hostname, username, remote_path);
+ }
+ }
+ } else {
+ logger_->log_debug("There are no files to list. Yielding.");
+ context->yield();
+ return;
+ }
+}
+
+bool ListSFTP::persistTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path) {
+ std::ofstream file(tracking_entities_state_filename_);
+ if (!file.is_open()) {
+ logger_->log_error("Failed to store Tracking Entities state to state file \"%s\"", tracking_entities_state_filename_.c_str());
+ return false;
+ }
+ file << "hostname=" << hostname << "\n";
+ file << "username=" << username << "\n";
+ file << "remote_path=" << remote_path << "\n";
+ file << "json_state_file=" << tracking_entities_state_json_filename_ << "\n";
+ file.close();
+
+ std::ofstream json_file(tracking_entities_state_json_filename_);
+ if (!json_file.is_open()) {
+ logger_->log_error("Failed to store Tracking Entities state to state json file \"%s\"", tracking_entities_state_json_filename_.c_str());
+ return false;
+ }
+
+ rapidjson::Document entities(rapidjson::kObjectType);
+ rapidjson::Document::AllocatorType& alloc = entities.GetAllocator();
+ for (const auto& already_listed_entity : already_listed_entities_) {
+ rapidjson::Value entity(rapidjson::kObjectType);
+ entity.AddMember("timestamp", already_listed_entity.second.timestamp, alloc);
+ entity.AddMember("size", already_listed_entity.second.size, alloc);
+ entities.AddMember(rapidjson::Value(already_listed_entity.first.c_str(), alloc), std::move(entity), alloc);
+ }
+
+ rapidjson::OStreamWrapper osw(json_file);
+ rapidjson::Writer<rapidjson::OStreamWrapper> writer(osw);
+ entities.Accept(writer);
+
+ return true;
+}
+
+bool ListSFTP::updateFromTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path) {
+ std::ifstream file(tracking_entities_state_filename_);
+ if (!file.is_open()) {
+ logger_->log_error("Failed to open Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
+ return false;
+ }
+ std::string state_hostname;
+ std::string state_username;
+ std::string state_remote_path;
+ std::string state_json_state_file;
+
+ std::string line;
+ while (std::getline(file, line)) {
+ size_t separator_pos = line.find('=');
+ if (separator_pos == std::string::npos) {
+ logger_->log_warn("None key-value line found in Tracking Entities state file \"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), line.c_str());
+ continue;
+ }
+ std::string key = line.substr(0, separator_pos);
+ std::string value = line.substr(separator_pos + 1);
+ if (key == "hostname") {
+ state_hostname = std::move(value);
+ } else if (key == "username") {
+ state_username = std::move(value);
+ } else if (key == "remote_path") {
+ state_remote_path = std::move(value);
+ } else if (key == "json_state_file") {
+ state_json_state_file = std::move(value);
+ } else {
+ logger_->log_warn("Unknown key found in Tracking Entities state file \"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), key.c_str());
+ }
+ }
+ file.close();
+
+ if (state_hostname != hostname ||
+ state_username != username ||
+ state_remote_path != remote_path) {
+ logger_->log_error("Tracking Entities state file \"%s\" was created with different settings than the current ones, ignoring. "
+ "Hostname: \"%s\" vs. \"%s\", "
+ "Username: \"%s\" vs. \"%s\", "
+ "Remote Path: \"%s\" vs. \"%s\"",
+ tracking_entities_state_filename_.c_str(),
+ state_hostname, hostname,
+ state_username, username,
+ state_remote_path, remote_path);
+ return false;
+ }
+
+ if (state_json_state_file.empty()) {
+ logger_->log_error("Could not found json state file path in Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
+ return false;
+ }
+
+ std::ifstream json_file(state_json_state_file);
+ if (!json_file.is_open()) {
+ logger_->log_error("Failed to open entities Tracking Entities state json file \"%s\"", state_json_state_file.c_str());
+ return false;
+ }
+
+ try {
+ rapidjson::IStreamWrapper isw(json_file);
+ rapidjson::Document d;
+ rapidjson::ParseResult res = d.ParseStream(isw);
+ if (!res) {
+ logger_->log_error("Failed to parse Tracking Entities state json file \"%s\"", state_json_state_file.c_str());
+ return false;
+ }
+ if (!d.IsObject()) {
+ logger_->log_error("Tracking Entities state json file \"%s\" root is not an object", state_json_state_file.c_str());
+ return false;
+ }
+
+ std::unordered_map<std::string, ListedEntity> new_already_listed_entities;
+ for (const auto &already_listed_entity : d.GetObject()) {
+ auto it = already_listed_entity.value.FindMember("timestamp");
+ if (it == already_listed_entity.value.MemberEnd() || !it->value.IsUint64()) {
+ logger_->log_error("Tracking Entities state json file \"%s\" timestamp missing or malformatted for entity \"%s\"",
+ state_json_state_file.c_str(),
+ already_listed_entity.name.GetString());
+ continue;
+ }
+ uint64_t timestamp = it->value.GetUint64();
+ it = already_listed_entity.value.FindMember("size");
+ if (it == already_listed_entity.value.MemberEnd() || !it->value.IsUint64()) {
+ logger_->log_error("Tracking Entities state json file \"%s\" size missing or malformatted for entity \"%s\"",
+ state_json_state_file.c_str(),
+ already_listed_entity.name.GetString());
+ continue;
+ }
+ uint64_t size = it->value.GetUint64();
+ new_already_listed_entities.emplace(std::piecewise_construct,
+ std::forward_as_tuple(already_listed_entity.name.GetString()),
+ std::forward_as_tuple(timestamp, size));
+ }
+ already_listed_entities_ = std::move(new_already_listed_entities);
+ } catch (std::exception& e) {
+ logger_->log_error("Exception while parsing Tracking Entities state json file \"%s\": %s", state_json_state_file.c_str(), e.what());
+ return false;
+ }
+
+ return true;
+}
+
+void ListSFTP::listByTrackingEntities(
+ const std::shared_ptr<core::ProcessContext>& context,
+ const std::shared_ptr<core::ProcessSession>& session,
+ const std::string& hostname,
+ uint16_t port,
+ const std::string& username,
+ const std::string& remote_path,
+ uint64_t entity_tracking_time_window,
+ std::vector<Child>&& files) {
+ /* Load state from cache file if needed */
+ if (!already_loaded_from_cache_ && !tracking_entities_state_filename_.empty()) {
+ if (updateFromTrackingEntitiesCache(hostname, username, remote_path)) {
+ logger_->log_debug("Successfully loaded Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
+ initial_listing_complete_ = true;
+ } else {
+ logger_->log_debug("Failed to load Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
+ }
+ already_loaded_from_cache_ = true;
+ }
+
+ time_t now = time(nullptr);
+ uint64_t min_timestamp_to_list = (!initial_listing_complete_ && entity_tracking_initial_listing_target_ == ENTITY_TRACKING_INITIAL_LISTING_TARGET_ALL_AVAILABLE)
+ ? 0U : (now * 1000 - entity_tracking_time_window);
+
+ /* Skip files not in the tracking window */
+ for (auto it = files.begin(); it != files.end(); ) {
+ if (it->attrs.mtime * 1000 < min_timestamp_to_list) {
+ logger_->log_trace("Skipping \"%s\" because it has an older timestamp than the minimum timestamp to list: %lu < %lu",
+ it->getPath(), it->attrs.mtime * 1000, min_timestamp_to_list);
+ it = files.erase(it);
+ } else {
+ ++it;
+ }
+ }
+
+ if (files.empty()) {
+ logger_->log_debug("No entities to list within the tracking time window");
+ context->yield();
+ return;
+ }
+
+ /* Find files that have been updated */
+ std::vector<Child> updated_entities;
+ std::copy_if(std::make_move_iterator(files.begin()),
+ std::make_move_iterator(files.end()),
+ std::back_inserter(updated_entities),
+ [&](const Child& child) {
+ auto already_listed_it = already_listed_entities_.find(child.getPath());
+ if (already_listed_it == already_listed_entities_.end()) {
+ logger_->log_trace("Found new file \"%s\"", child.getPath());
+ return true;
+ }
+
+ if (child.attrs.mtime * 1000 > already_listed_it->second.timestamp) {
+ logger_->log_trace("Found file \"%s\" with newer timestamp: %lu -> %lu",
+ child.getPath(),
+ already_listed_it->second.timestamp,
+ child.attrs.mtime * 1000);
+ return true;
+ }
+
+ if (child.attrs.filesize != already_listed_it->second.size) {
+ logger_->log_trace("Found file \"%s\" with different size: %lu -> %lu",
+ child.getPath(),
+ already_listed_it->second.size,
+ child.attrs.filesize);
+ return true;
+ }
+
+ logger_->log_trace("Skipping file \"%s\" because it has not changed", child.getPath());
+ return false;
+ });
+
+ /* Find entities in the tracking cache that are no longer in the tracking window */
+ std::vector<std::string> old_entity_ids;
+ for (const auto& already_listed_entity : already_listed_entities_) {
+ if (already_listed_entity.second.timestamp < min_timestamp_to_list) {
+ old_entity_ids.emplace_back(already_listed_entity.first);
+ }
+ }
+
+ /* If we have no new files and no expired tracked entities, we have nothing to do */
+ if (updated_entities.empty() && old_entity_ids.empty()) {
+ context->yield();
+ return;
+ }
+
+ /* Remove expired entities */
+ for (const auto& old_entity_id : old_entity_ids) {
+ already_listed_entities_.erase(old_entity_id);
+ }
+
+ for (const auto& updated_entity : updated_entities) {
+ /* Create the FlowFile for this path */
+ if (!createAndTransferFlowFileFromChild(session, hostname, port, username, updated_entity)) {
+ logger_->log_error("Failed to emit FlowFile for \"%s\"", updated_entity.getPath());
+ context->yield();
+ return;
+ }
+ already_listed_entities_[updated_entity.getPath()] = ListedEntity(updated_entity.attrs.mtime * 1000, updated_entity.attrs.filesize);
+ }
+
+ initial_listing_complete_ = true;
+
+ if (!tracking_entities_state_filename_.empty()) {
+ persistTrackingEntitiesCache(hostname, username, remote_path);
+ }
+}
+
+void ListSFTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+ /* Parse common properties */
+ SFTPProcessorBase::CommonProperties common_properties;
+ if (!parseCommonPropertiesOnTrigger(context, nullptr /*flow_file*/, common_properties)) {
+ context->yield();
+ return;
+ }
+
+ /* Parse processor-specific properties */
+ std::string remote_path;
+ uint64_t entity_tracking_time_window = 0U;
+
+ std::string value;
+ context->getProperty(RemotePath.getName(), remote_path);
+ /* Remove trailing slashes */
+ while (remote_path.size() > 1U && remote_path.back() == '/') {
+ remote_path.resize(remote_path.size() - 1);
+ }
+ if (context->getProperty(EntityTrackingTimeWindow.getName(), value)) {
+ core::TimeUnit unit;
+ if (!core::Property::StringToTime(value, entity_tracking_time_window, unit) ||
+ !core::Property::ConvertTimeUnitToMS(entity_tracking_time_window, unit, entity_tracking_time_window)) {
+ /* The default is 3 hours */
+ entity_tracking_time_window = 3 * 3600 * 1000;
+ logger_->log_error("Entity Tracking Time Window attribute is invalid");
+ }
+ } else {
+ /* The default is 3 hours */
+ entity_tracking_time_window = 3 * 3600 * 1000;
+ }
+
+ /* Check whether we need to invalidate the cache based on the new properties */
+ if ((!last_hostname_.empty() && last_hostname_ != common_properties.hostname) ||
+ (!last_username_.empty() && last_username_ != common_properties.username) ||
+ (!last_remote_path_.empty() && last_remote_path_ != remote_path)) {
+ invalidateCache();
+ }
+ last_hostname_ = common_properties.hostname;
+ last_username_ = common_properties.username;
+ last_remote_path_ = remote_path;
+
+ /* Get SFTPClient from cache or create it */
+ const SFTPProcessorBase::ConnectionCacheKey connection_cache_key = {common_properties.hostname,
+ common_properties.port,
+ common_properties.username,
+ proxy_type_,
+ common_properties.proxy_host,
+ common_properties.proxy_port,
+ common_properties.proxy_username};
+ auto client = getOrCreateConnection(connection_cache_key,
+ common_properties.password,
+ common_properties.private_key_path,
+ common_properties.private_key_passphrase,
+ common_properties.proxy_password);
+ if (client == nullptr) {
+ context->yield();
+ return;
+ }
+
+ /*
+ * Unless we're sure that the connection is good, we don't want to put it back to the cache.
+ * So we will only call this when we're sure that the connection is OK.
+ */
+ auto put_connection_back_to_cache = [this, &connection_cache_key, &client]() {
+ addConnectionToCache(connection_cache_key, std::move(client));
+ };
+
+ std::deque<Child> directories;
+ std::vector<Child> files;
+
+ /* Add initial directory */
+ Child root;
+ std::tie(root.parent_path, root.filename) = utils::file::FileUtils::split_path(remote_path, true /*force_posix*/);
+ root.directory = true;
+ directories.emplace_back(std::move(root));
+
+ /* Process directories */
+ while (!directories.empty()) {
+ auto directory = std::move(directories.front());
+ directories.pop_front();
+
+ std::string new_parent_path;
+ if (directory.parent_path.empty()) {
+ new_parent_path = directory.filename;
+ } else {
+ new_parent_path = directory.getPath();
+ }
+ std::vector<std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>> dir_children;
+ if (!client->listDirectory(new_parent_path, follow_symlink_, dir_children)) {
+ continue;
+ }
+ for (auto&& dir_child : dir_children) {
+ if (filter(new_parent_path, dir_child)) {
+ Child child(new_parent_path, std::move(dir_child));
+ if (child.directory) {
+ directories.emplace_back(std::move(child));
+ } else {
+ files.emplace_back(std::move(child));
+ }
+ }
+ }
+ }
+
+ /* Process the files with the appropriate tracking strategy */
+ if (listing_strategy_ == LISTING_STRATEGY_TRACKING_TIMESTAMPS) {
+ listByTrackingTimestamps(context, session, common_properties.hostname, common_properties.port, common_properties.username, remote_path, std::move(files));
+ } else if (listing_strategy_ == LISTING_STRATEGY_TRACKING_ENTITIES) {
+ listByTrackingEntities(context, session, common_properties.hostname, common_properties.port, common_properties.username, remote_path, entity_tracking_time_window, std::move(files));
+ } else {
+ logger_->log_error("Unknown Listing Strategy: \"%s\"", listing_strategy_.c_str());
+ context->yield();
+ return;
+ }
+
+ put_connection_back_to_cache();
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sftp/processors/ListSFTP.h b/extensions/sftp/processors/ListSFTP.h
new file mode 100644
index 0000000..4fe32e2
--- /dev/null
+++ b/extensions/sftp/processors/ListSFTP.h
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __LIST_SFTP_H__
+#define __LIST_SFTP_H__
+
+#include <memory>
+#include <string>
+#include <map>
+#include <chrono>
+#include <cstdint>
+#ifndef WIN32
+#include <regex.h>
+#else
+#include <regex>
+#endif
+
+#include "SFTPProcessorBase.h"
+#include "utils/ByteArrayCallback.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+#include "../client/SFTPClient.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class ListSFTP : public SFTPProcessorBase {
+ public:
+
+ static constexpr char const *LISTING_STRATEGY_TRACKING_TIMESTAMPS = "Tracking Timestamps";
+ static constexpr char const *LISTING_STRATEGY_TRACKING_ENTITIES = "Tracking Entities";
+
+ static constexpr char const *TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT = "Auto Detect";
+ static constexpr char const *TARGET_SYSTEM_TIMESTAMP_PRECISION_MILLISECONDS = "Milliseconds";
+ static constexpr char const *TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS = "Seconds";
+ static constexpr char const *TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES = "Minutes";
+
+ static constexpr char const *ENTITY_TRACKING_INITIAL_LISTING_TARGET_TRACKING_TIME_WINDOW = "Tracking Time Window";
+ static constexpr char const *ENTITY_TRACKING_INITIAL_LISTING_TARGET_ALL_AVAILABLE = "All Available";
+
+ static constexpr char const* ProcessorName = "ListSFTP";
+
+
+ /*!
+ * Create a new processor
+ */
+ ListSFTP(std::string name, utils::Identifier uuid = utils::Identifier());
+ virtual ~ListSFTP();
+
+ // Supported Properties
+ static core::Property ListingStrategy;
+ static core::Property RemotePath;
+ static core::Property SearchRecursively;
+ static core::Property FollowSymlink;
+ static core::Property FileFilterRegex;
+ static core::Property PathFilterRegex;
+ static core::Property IgnoreDottedFiles;
+ static core::Property TargetSystemTimestampPrecision;
+ static core::Property EntityTrackingTimeWindow;
+ static core::Property EntityTrackingInitialListingTarget;
+ static core::Property MinimumFileAge;
+ static core::Property MaximumFileAge;
+ static core::Property MinimumFileSize;
+ static core::Property MaximumFileSize;
+ static core::Property StateFile;
+
+ // Supported Relationships
+ static core::Relationship Success;
+
+ // Writes Attributes
+ static constexpr char const* ATTRIBUTE_SFTP_REMOTE_HOST = "sftp.remote.host";
+ static constexpr char const* ATTRIBUTE_SFTP_REMOTE_PORT = "sftp.remote.port";
+ static constexpr char const* ATTRIBUTE_SFTP_LISTING_USER = "sftp.listing.user";
+ static constexpr char const* ATTRIBUTE_FILE_OWNER = "file.owner";
+ static constexpr char const* ATTRIBUTE_FILE_GROUP = "file.group";
+ static constexpr char const* ATTRIBUTE_FILE_PERMISSIONS = "file.permissions";
+ static constexpr char const* ATTRIBUTE_FILE_SIZE = "file.size";
+ static constexpr char const* ATTRIBUTE_FILE_LASTMODIFIEDTIME = "file.lastModifiedTime";
+
+ static const std::map<std::string, uint64_t> LISTING_LAG_MAP;
+
+ virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+ virtual void initialize() override;
+ virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+
+ private:
+
+ std::string listing_strategy_;
+ bool search_recursively_;
+ bool follow_symlink_;
+ std::string file_filter_regex_;
+ std::string path_filter_regex_;
+ bool file_filter_regex_set_;
+ bool path_filter_regex_set_;
+#ifndef WIN32
+ regex_t compiled_file_filter_regex_;
+ regex_t compiled_path_filter_regex_;
+#else
+ std::regex compiled_file_filter_regex_;
+ std::regex compiled_path_filter_regex_;
+#endif
+ bool ignore_dotted_files_;
+ std::string target_system_timestamp_precision_;
+ std::string entity_tracking_initial_listing_target_;
+ uint64_t minimum_file_age_;
+ uint64_t maximum_file_age_;
+ uint64_t minimum_file_size_;
+ uint64_t maximum_file_size_;
+
+ std::string last_listing_strategy_;
+ std::string last_hostname_;
+ std::string last_username_;
+ std::string last_remote_path_;
+
+ struct Child {
+ Child();
+ Child(const std::string& parent_path_, std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>&& sftp_child);
+ std::string getPath() const;
+
+ bool directory;
+ std::string parent_path;
+ std::string filename;
+ LIBSSH2_SFTP_ATTRIBUTES attrs;
+ };
+
+ bool already_loaded_from_cache_;
+
+ std::string tracking_timestamps_state_filename_;
+ std::chrono::time_point<std::chrono::steady_clock> last_run_time_;
+ uint64_t last_listed_latest_entry_timestamp_;
+ uint64_t last_processed_latest_entry_timestamp_;
+ std::set<std::string> latest_identifiers_processed_;
+
+ bool initial_listing_complete_;
+ std::string tracking_entities_state_filename_;
+ std::string tracking_entities_state_json_filename_;
+ struct ListedEntity {
+ uint64_t timestamp;
+ uint64_t size;
+
+ ListedEntity();
+ ListedEntity(uint64_t timestamp, uint64_t size);
+ };
+ std::unordered_map<std::string, ListedEntity> already_listed_entities_;
+
+ void invalidateCache();
+
+ bool filter(const std::string& parent_path, const std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>& sftp_child);
+ bool filterFile(const std::string& parent_path, const std::string& filename, const LIBSSH2_SFTP_ATTRIBUTES& attrs);
+ bool filterDirectory(const std::string& parent_path, const std::string& filename, const LIBSSH2_SFTP_ATTRIBUTES& attrs);
+
+ bool createAndTransferFlowFileFromChild(
+ const std::shared_ptr<core::ProcessSession>& session,
+ const std::string& hostname,
+ uint16_t port,
+ const std::string& username,
+ const Child& child);
+
+ bool persistTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path);
+ bool updateFromTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path);
+
+ bool persistTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path);
+ bool updateFromTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path);
+
+ void listByTrackingTimestamps(
+ const std::shared_ptr<core::ProcessContext>& context,
+ const std::shared_ptr<core::ProcessSession>& session,
+ const std::string& hostname,
+ uint16_t port,
+ const std::string& username,
+ const std::string& remote_path,
+ std::vector<Child>&& files);
+
+ void listByTrackingEntities(
+ const std::shared_ptr<core::ProcessContext>& context,
+ const std::shared_ptr<core::ProcessSession>& session,
+ const std::string& hostname,
+ uint16_t port,
+ const std::string& username,
+ const std::string& remote_path,
+ uint64_t entity_tracking_time_window,
+ std::vector<Child>&& files);
+};
+
+REGISTER_RESOURCE(ListSFTP, "Performs a listing of the files residing on an SFTP server. "
+ "For each file that is found on the remote server, a new FlowFile will be created with "
+ "the filename attribute set to the name of the file on the remote server. "
+ "This can then be used in conjunction with FetchSFTP in order to fetch those files.")
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
diff --git a/extensions/sftp/processors/PutSFTP.cpp b/extensions/sftp/processors/PutSFTP.cpp
index e1b589d..3262feb 100644
--- a/extensions/sftp/processors/PutSFTP.cpp
+++ b/extensions/sftp/processors/PutSFTP.cpp
@@ -42,6 +42,7 @@
#include "ResourceClaim.h"
#include "utils/StringUtils.h"
#include "utils/ScopeGuard.h"
+#include "utils/file/FileUtils.h"
namespace org {
namespace apache {
@@ -49,30 +50,12 @@
namespace minifi {
namespace processors {
-core::Property PutSFTP::Hostname(
- core::PropertyBuilder::createProperty("Hostname")->withDescription("The fully qualified hostname or IP address of the remote system")
- ->supportsExpressionLanguage(true)->build());
-core::Property PutSFTP::Port(
- core::PropertyBuilder::createProperty("Port")->withDescription("The port that the remote system is listening on for file transfers")
- ->supportsExpressionLanguage(true)->build());
-core::Property PutSFTP::Username(
- core::PropertyBuilder::createProperty("Username")->withDescription("Username")
- ->supportsExpressionLanguage(true)->build());
-core::Property PutSFTP::Password(
- core::PropertyBuilder::createProperty("Password")->withDescription("Password for the user account")
- ->supportsExpressionLanguage(true)->isRequired(false)->build());
-core::Property PutSFTP::PrivateKeyPath(
- core::PropertyBuilder::createProperty("Private Key Path")->withDescription("The fully qualified path to the Private Key file")
- ->supportsExpressionLanguage(true)->isRequired(false)->build());
-core::Property PutSFTP::PrivateKeyPassphrase(
- core::PropertyBuilder::createProperty("Private Key Passphrase")->withDescription("Password for the private key")
- ->supportsExpressionLanguage(true)->isRequired(false)->build());
core::Property PutSFTP::RemotePath(
core::PropertyBuilder::createProperty("Remote Path")->withDescription("The path on the remote system from which to pull or push files")
- ->supportsExpressionLanguage(true)->isRequired(false)->build());
+ ->isRequired(false)->supportsExpressionLanguage(true)->build());
core::Property PutSFTP::CreateDirectory(
core::PropertyBuilder::createProperty("Create Directory")->withDescription("Specifies whether or not the remote directory should be created if it does not exist.")
- ->withDefaultValue<bool>(false)->build());
+ ->isRequired(true)->withDefaultValue<bool>(false)->build());
core::Property PutSFTP::DisableDirectoryListing(
core::PropertyBuilder::createProperty("Disable Directory Listing")->withDescription("If set to 'true', directory listing is not performed prior to create missing directories. "
"By default, this processor executes a directory listing command to see target directory existence before creating missing directories. "
@@ -80,18 +63,13 @@
"Directory listing might fail with some permission setups (e.g. chmod 100) on a directory. "
"Also, if any other SFTP client created the directory after this processor performed a listing and before a directory creation request by this processor is finished, "
"then an error is returned because the directory already exists.")
- ->isRequired(false)->withDefaultValue<bool>(false)->build());
+ ->isRequired(false)->withDefaultValue<bool>(false)->build());
core::Property PutSFTP::BatchSize(
core::PropertyBuilder::createProperty("Batch Size")->withDescription("The maximum number of FlowFiles to send in a single connection")
- ->withDefaultValue<uint64_t>(500)->build());
-core::Property PutSFTP::ConnectionTimeout(
- core::PropertyBuilder::createProperty("Connection Timeout")->withDescription("Amount of time to wait before timing out while creating a connection")
- ->withDefaultValue<core::TimePeriodValue>("30 sec")->build());
-core::Property PutSFTP::DataTimeout(
- core::PropertyBuilder::createProperty("Data Timeout")->withDescription("When transferring a file between the local and remote system, this value specifies how long is allowed to elapse without any data being transferred between systems")
- ->withDefaultValue<core::TimePeriodValue>("30 sec")->build());
+ ->isRequired(true)->withDefaultValue<uint64_t>(500)->build());
core::Property PutSFTP::ConflictResolution(
core::PropertyBuilder::createProperty("Conflict Resolution")->withDescription("Determines how to handle the problem of filename collisions")
+ ->isRequired(true)
->withAllowableValues<std::string>({CONFLICT_RESOLUTION_REPLACE,
CONFLICT_RESOLUTION_IGNORE,
CONFLICT_RESOLUTION_RENAME,
@@ -109,63 +87,34 @@
core::Property PutSFTP::TempFilename(
core::PropertyBuilder::createProperty("Temporary Filename")->withDescription("If set, the filename of the sent file will be equal to the value specified during the transfer and after successful completion will be renamed to the original filename. "
"If this value is set, the Dot Rename property is ignored.")
- ->supportsExpressionLanguage(true)->isRequired(false)->build());
-core::Property PutSFTP::HostKeyFile(
- core::PropertyBuilder::createProperty("Host Key File")->withDescription("If supplied, the given file will be used as the Host Key; otherwise, no use host key file will be used")
- ->isRequired(false)->build());
+ ->isRequired(false)->supportsExpressionLanguage(true)->build());
core::Property PutSFTP::LastModifiedTime(
core::PropertyBuilder::createProperty("Last Modified Time")->withDescription("The lastModifiedTime to assign to the file after transferring it. "
"If not set, the lastModifiedTime will not be changed. "
"Format must be yyyy-MM-dd'T'HH:mm:ssZ. "
"You may also use expression language such as ${file.lastModifiedTime}. "
"If the value is invalid, the processor will not be invalid but will fail to change lastModifiedTime of the file.")
- ->supportsExpressionLanguage(true)->isRequired(false)->build());
+ ->isRequired(false)->supportsExpressionLanguage(true)->build());
core::Property PutSFTP::Permissions(
core::PropertyBuilder::createProperty("Permissions")->withDescription("The permissions to assign to the file after transferring it. "
"Format must be either UNIX rwxrwxrwx with a - in place of denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644). "
"If not set, the permissions will not be changed. "
"You may also use expression language such as ${file.permissions}. "
"If the value is invalid, the processor will not be invalid but will fail to change permissions of the file.")
- ->supportsExpressionLanguage(true)->isRequired(false)->build());
+ ->isRequired(false)->supportsExpressionLanguage(true)->build());
core::Property PutSFTP::RemoteOwner(
core::PropertyBuilder::createProperty("Remote Owner")->withDescription("Integer value representing the User ID to set on the file after transferring it. "
"If not set, the owner will not be set. You may also use expression language such as ${file.owner}. "
"If the value is invalid, the processor will not be invalid but will fail to change the owner of the file.")
- ->supportsExpressionLanguage(true)->isRequired(false)->build());
+ ->isRequired(false)->supportsExpressionLanguage(true)->build());
core::Property PutSFTP::RemoteGroup(
core::PropertyBuilder::createProperty("Remote Group")->withDescription("Integer value representing the Group ID to set on the file after transferring it. "
"If not set, the group will not be set. You may also use expression language such as ${file.group}. "
"If the value is invalid, the processor will not be invalid but will fail to change the group of the file.")
- ->supportsExpressionLanguage(true)->isRequired(false)->build());
-core::Property PutSFTP::StrictHostKeyChecking(
- core::PropertyBuilder::createProperty("Strict Host Key Checking")->withDescription("Indicates whether or not strict enforcement of hosts keys should be applied")
- ->withDefaultValue<bool>(false)->build());
-core::Property PutSFTP::UseKeepaliveOnTimeout(
- core::PropertyBuilder::createProperty("Send Keep Alive On Timeout")->withDescription("Indicates whether or not to send a single Keep Alive message when SSH socket times out")
- ->withDefaultValue<bool>(true)->build());
+ ->isRequired(false)->supportsExpressionLanguage(true)->build());
core::Property PutSFTP::UseCompression(
core::PropertyBuilder::createProperty("Use Compression")->withDescription("Indicates whether or not ZLIB compression should be used when transferring files")
- ->withDefaultValue<bool>(false)->build());
-core::Property PutSFTP::ProxyType(
- core::PropertyBuilder::createProperty("Proxy Type")->withDescription("Specifies the Proxy Configuration Controller Service to proxy network requests. If set, it supersedes proxy settings configured per component. "
- "Supported proxies: HTTP + AuthN, SOCKS + AuthN")
- ->isRequired(false)
- ->withAllowableValues<std::string>({PROXY_TYPE_DIRECT,
- PROXY_TYPE_HTTP,
- PROXY_TYPE_SOCKS})
- ->withDefaultValue(PROXY_TYPE_DIRECT)->build());
-core::Property PutSFTP::ProxyHost(
- core::PropertyBuilder::createProperty("Proxy Host")->withDescription("The fully qualified hostname or IP address of the proxy server")
- ->supportsExpressionLanguage(true)->isRequired(false)->build());
-core::Property PutSFTP::ProxyPort(
- core::PropertyBuilder::createProperty("Proxy Port")->withDescription("The port of the proxy server")
- ->supportsExpressionLanguage(true)->isRequired(false)->build());
-core::Property PutSFTP::HttpProxyUsername(
- core::PropertyBuilder::createProperty("Http Proxy Username")->withDescription("Http Proxy Username")
- ->supportsExpressionLanguage(true)->isRequired(false)->build());
-core::Property PutSFTP::HttpProxyPassword(
- core::PropertyBuilder::createProperty("Http Proxy Password")->withDescription("Http Proxy Password")
- ->supportsExpressionLanguage(true)->isRequired(false)->build());
+ ->isRequired(true)->withDefaultValue<bool>(false)->build());
core::Relationship PutSFTP::Success("success", "FlowFiles that are successfully sent will be routed to success");
core::Relationship PutSFTP::Reject("reject", "FlowFiles that were rejected by the destination system");
@@ -176,35 +125,20 @@
// Set the supported properties
std::set<core::Property> properties;
- properties.insert(Hostname);
- properties.insert(Port);
- properties.insert(Username);
- properties.insert(Password);
- properties.insert(PrivateKeyPath);
- properties.insert(PrivateKeyPassphrase);
+ addSupportedCommonProperties(properties);
properties.insert(RemotePath);
properties.insert(CreateDirectory);
properties.insert(DisableDirectoryListing);
properties.insert(BatchSize);
- properties.insert(ConnectionTimeout);
- properties.insert(DataTimeout);
properties.insert(ConflictResolution);
properties.insert(RejectZeroByte);
properties.insert(DotRename);
properties.insert(TempFilename);
- properties.insert(HostKeyFile);
properties.insert(LastModifiedTime);
properties.insert(Permissions);
properties.insert(RemoteOwner);
properties.insert(RemoteGroup);
- properties.insert(StrictHostKeyChecking);
- properties.insert(UseKeepaliveOnTimeout);
properties.insert(UseCompression);
- properties.insert(ProxyType);
- properties.insert(ProxyHost);
- properties.insert(ProxyPort);
- properties.insert(HttpProxyUsername);
- properties.insert(HttpProxyPassword);
setSupportedProperties(properties);
// Set the supported relationships
@@ -215,143 +149,21 @@
setSupportedRelationships(relationships);
}
-constexpr size_t PutSFTP::CONNECTION_CACHE_MAX_SIZE;
-
PutSFTP::PutSFTP(std::string name, utils::Identifier uuid /*= utils::Identifier()*/)
- : Processor(name, uuid),
- logger_(logging::LoggerFactory<PutSFTP>::getLogger()),
+ : SFTPProcessorBase(name, uuid),
create_directory_(false),
batch_size_(0),
- connection_timeout_(0),
- data_timeout_(0),
reject_zero_byte_(false),
- dot_rename_(false),
- strict_host_checking_(false),
- use_keepalive_on_timeout_(false),
- use_compression_(false),
- running_(true) {
+ dot_rename_(false) {
+ logger_ = logging::LoggerFactory<PutSFTP>::getLogger();
}
PutSFTP::~PutSFTP() {
- if (keepalive_thread_.joinable()) {
- {
- std::lock_guard<std::mutex> lock(connections_mutex_);
- running_ = false;
- keepalive_cv_.notify_one();
- }
- keepalive_thread_.join();
- }
-}
-
-bool PutSFTP::ConnectionCacheKey::operator<(const PutSFTP::ConnectionCacheKey& other) const {
- return std::tie(hostname, port, username, proxy_type, proxy_host, proxy_port) <
- std::tie(other.hostname, other.port, other.username, other.proxy_type, other.proxy_host, other.proxy_port);
-}
-
-bool PutSFTP::ConnectionCacheKey::operator==(const PutSFTP::ConnectionCacheKey& other) const {
- return std::tie(hostname, port, username, proxy_type, proxy_host, proxy_port) ==
- std::tie(other.hostname, other.port, other.username, other.proxy_type, other.proxy_host, other.proxy_port);
-}
-
-std::unique_ptr<utils::SFTPClient> PutSFTP::getConnectionFromCache(const PutSFTP::ConnectionCacheKey& key) {
- std::lock_guard<std::mutex> lock(connections_mutex_);
-
- auto it = connections_.find(key);
- if (it == connections_.end()) {
- return nullptr;
- }
-
- logger_->log_debug("Removing %s@%s:%hu from SFTP connection pool",
- key.username,
- key.hostname,
- key.port);
-
- auto lru_it = std::find(lru_.begin(), lru_.end(), key);
- if (lru_it == lru_.end()) {
- logger_->log_trace("Assertion error: can't find key in LRU cache");
- } else {
- lru_.erase(lru_it);
- }
-
- auto connection = std::move(it->second);
- connections_.erase(it);
- return connection;
-}
-
-void PutSFTP::addConnectionToCache(const PutSFTP::ConnectionCacheKey& key, std::unique_ptr<utils::SFTPClient>&& connection) {
- std::lock_guard<std::mutex> lock(connections_mutex_);
-
- while (connections_.size() >= PutSFTP::CONNECTION_CACHE_MAX_SIZE) {
- const auto& lru_key = lru_.back();
- logger_->log_debug("SFTP connection pool is full, removing %s@%s:%hu",
- lru_key.username,
- lru_key.hostname,
- lru_key.port);
- connections_.erase(lru_key);
- lru_.pop_back();
- }
-
- logger_->log_debug("Adding %s@%s:%hu to SFTP connection pool",
- key.username,
- key.hostname,
- key.port);
- connections_.emplace(key, std::move(connection));
- lru_.push_front(key);
- keepalive_cv_.notify_one();
-}
-
-void PutSFTP::keepaliveThreadFunc() {
- std::unique_lock<std::mutex> lock(connections_mutex_);
-
- while (true) {
- if (connections_.empty()) {
- keepalive_cv_.wait(lock, [this] {
- return !running_ || !connections_.empty();
- });
- }
- if (!running_) {
- logger_->log_trace("Stopping keepalive thread");
- lock.unlock();
- return;
- }
-
- int min_wait = 10;
- for (auto &connection : connections_) {
- int seconds_to_next = 0;
- if (connection.second->sendKeepAliveIfNeeded(seconds_to_next)) {
- logger_->log_debug("Sent keepalive to %s@%s:%hu if needed, next keepalive in %d s",
- connection.first.username,
- connection.first.hostname,
- connection.first.port,
- seconds_to_next);
- if (seconds_to_next < min_wait) {
- min_wait = seconds_to_next;
- }
- } else {
- logger_->log_debug("Failed to send keepalive to %s@%s:%hu",
- connection.first.username,
- connection.first.hostname,
- connection.first.port);
- }
- }
-
- /* Avoid busy loops */
- if (min_wait < 1) {
- min_wait = 1;
- }
-
- logger_->log_trace("Keepalive thread is going to sleep for %d s", min_wait);
- keepalive_cv_.wait_for(lock, std::chrono::seconds(min_wait), [this] {
- return !running_;
- });
- if (!running_) {
- lock.unlock();
- return;
- }
- }
}
void PutSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+ parseCommonPropertiesOnSchedule(context);
+
std::string value;
if (!context->getProperty(CreateDirectory.getName(), value)) {
logger_->log_error("Create Directory attribute is missing or invalid");
@@ -363,22 +175,6 @@
} else {
core::Property::StringToInt(value, batch_size_);
}
- if (!context->getProperty(ConnectionTimeout.getName(), value)) {
- logger_->log_error("Connection Timeout attribute is missing or invalid");
- } else {
- core::TimeUnit unit;
- if (!core::Property::StringToTime(value, connection_timeout_, unit) || !core::Property::ConvertTimeUnitToMS(connection_timeout_, unit, connection_timeout_)) {
- logger_->log_error("Connection Timeout attribute is invalid");
- }
- }
- if (!context->getProperty(DataTimeout.getName(), value)) {
- logger_->log_error("Data Timeout attribute is missing or invalid");
- } else {
- core::TimeUnit unit;
- if (!core::Property::StringToTime(value, data_timeout_, unit) || !core::Property::ConvertTimeUnitToMS(data_timeout_, unit, data_timeout_)) {
- logger_->log_error("Data Timeout attribute is invalid");
- }
- }
context->getProperty(ConflictResolution.getName(), conflict_resolution_);
if (context->getProperty(RejectZeroByte.getName(), value)) {
utils::StringUtils::StringToBool(value, reject_zero_byte_);
@@ -386,43 +182,13 @@
if (context->getProperty(DotRename.getName(), value)) {
utils::StringUtils::StringToBool(value, dot_rename_);
}
- context->getProperty(HostKeyFile.getName(), host_key_file_);
- if (!context->getProperty(StrictHostKeyChecking.getName(), value)) {
- logger_->log_error("Strict Host Key Checking attribute is missing or invalid");
- } else {
- utils::StringUtils::StringToBool(value, strict_host_checking_);
- }
- if (!context->getProperty(UseKeepaliveOnTimeout.getName(), value)) {
- logger_->log_error("Send Keep Alive On Timeout attribute is missing or invalid");
- } else {
- utils::StringUtils::StringToBool(value, use_keepalive_on_timeout_);
- }
if (!context->getProperty(UseCompression.getName(), value)) {
logger_->log_error("Use Compression attribute is missing or invalid");
} else {
utils::StringUtils::StringToBool(value, use_compression_);
}
- context->getProperty(ProxyType.getName(), proxy_type_);
- if (use_keepalive_on_timeout_ && !keepalive_thread_.joinable()) {
- running_ = true;
- keepalive_thread_ = std::thread(&PutSFTP::keepaliveThreadFunc, this);
- }
-}
-
-void PutSFTP::notifyStop() {
- logger_->log_debug("Got notifyStop, stopping keepalive thread and clearing connections");
- if (keepalive_thread_.joinable()) {
- {
- std::lock_guard<std::mutex> lock(connections_mutex_);
- running_ = false;
- keepalive_cv_.notify_one();
- }
- keepalive_thread_.join();
- }
- /* The thread is no longer running, we don't have to lock */
- connections_.clear();
- lru_.clear();
+ startKeepaliveThreadIfNeeded();
}
PutSFTP::ReadCallback::ReadCallback(const std::string& target_path,
@@ -443,30 +209,26 @@
*stream,
conflict_resolution_ == CONFLICT_RESOLUTION_REPLACE /*overwrite*/,
stream->getSize() /*expected_size*/)) {
- return -1;
+ throw client_.getLastError();
}
- write_succeeded_ = true;
return stream->getSize();
}
-bool PutSFTP::ReadCallback::commit() {
- return write_succeeded_;
-}
-
bool PutSFTP::processOne(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord>(session->get());
if (flow_file == nullptr) {
return false;
}
- /* Parse possibly FlowFile-dependent properties */
+ /* Parse common properties */
+ SFTPProcessorBase::CommonProperties common_properties;
+ if (!parseCommonPropertiesOnTrigger(context, flow_file, common_properties)) {
+ context->yield();
+ return false;
+ }
+
+ /* Parse processor-specific properties */
std::string filename;
- std::string hostname;
- uint16_t port = 0U;
- std::string username;
- std::string password;
- std::string private_key_path;
- std::string private_key_passphrase;
std::string remote_path;
bool disable_directory_listing = false;
std::string temp_file_name;
@@ -478,50 +240,11 @@
uint64_t remote_owner = 0U;
bool remote_group_set = false;
uint64_t remote_group = 0U;
- std::string proxy_host;
- uint16_t proxy_port = 0U;
- std::string proxy_username;
- std::string proxy_password;
flow_file->getKeyedAttribute(FILENAME, filename);
std::string value;
- if (!context->getProperty(Hostname, hostname, flow_file)) {
- logger_->log_error("Hostname attribute is missing");
- context->yield();
- return false;
- }
- if (!context->getProperty(Port, value, flow_file)) {
- logger_->log_error("Port attribute is missing or invalid");
- context->yield();
- return false;
- } else {
- int port_tmp;
- if (!core::Property::StringToInt(value, port_tmp) ||
- port_tmp < std::numeric_limits<uint16_t>::min() ||
- port_tmp > std::numeric_limits<uint16_t>::max()) {
- logger_->log_error("Port attribute \"%s\" is invalid", value);
- context->yield();
- return false;
- } else {
- port = static_cast<uint16_t>(port_tmp);
- }
- }
- if (!context->getProperty(Username, username, flow_file)) {
- logger_->log_error("Username attribute is missing");
- context->yield();
- return false;
- }
- context->getProperty(Password, password, flow_file);
- context->getProperty(PrivateKeyPath, private_key_path, flow_file);
- context->getProperty(PrivateKeyPassphrase, private_key_passphrase, flow_file);
- context->getProperty(Password, password, flow_file);
context->getProperty(RemotePath, remote_path, flow_file);
- if (context->getDynamicProperty(DisableDirectoryListing.getName(), value)) {
- utils::StringUtils::StringToBool(value, disable_directory_listing);
- } else if (context->getProperty(DisableDirectoryListing.getName(), value)) {
- utils::StringUtils::StringToBool(value, disable_directory_listing);
- }
/* Remove trailing slashes */
while (remote_path.size() > 1U && remote_path.back() == '/') {
remote_path.resize(remote_path.size() - 1);
@@ -530,6 +253,11 @@
if (remote_path.empty()) {
remote_path = ".";
}
+ if (context->getDynamicProperty(DisableDirectoryListing.getName(), value)) {
+ utils::StringUtils::StringToBool(value, disable_directory_listing);
+ } else if (context->getProperty(DisableDirectoryListing.getName(), value)) {
+ utils::StringUtils::StringToBool(value, disable_directory_listing);
+ }
context->getProperty(TempFilename, temp_file_name, flow_file);
if (context->getProperty(LastModifiedTime, value, flow_file)) {
if (core::Property::StringToDateTime(value, last_modified_time)) {
@@ -551,21 +279,6 @@
remote_group_set = true;
}
}
- context->getProperty(ProxyHost, proxy_host, flow_file);
- if (context->getProperty(ProxyPort, value, flow_file) && !value.empty()) {
- int port_tmp;
- if (!core::Property::StringToInt(value, port_tmp) ||
- port_tmp < std::numeric_limits<uint16_t>::min() ||
- port_tmp > std::numeric_limits<uint16_t>::max()) {
- logger_->log_error("Proxy Port attribute \"%s\" is invalid", value);
- context->yield();
- return false;
- } else {
- proxy_port = static_cast<uint16_t>(port_tmp);
- }
- }
- context->getProperty(HttpProxyUsername, proxy_username, flow_file);
- context->getProperty(HttpProxyPassword, proxy_password, flow_file);
/* Reject zero-byte files if needed */
if (reject_zero_byte_ && flow_file->getSize() == 0U) {
@@ -575,56 +288,21 @@
}
/* Get SFTPClient from cache or create it */
- const PutSFTP::ConnectionCacheKey connection_cache_key = {hostname, port, username, proxy_type_, proxy_host, proxy_port};
- auto client = getConnectionFromCache(connection_cache_key);
+ const SFTPProcessorBase::ConnectionCacheKey connection_cache_key = {common_properties.hostname,
+ common_properties.port,
+ common_properties.username,
+ proxy_type_,
+ common_properties.proxy_host,
+ common_properties.proxy_port,
+ common_properties.proxy_username};
+ auto client = getOrCreateConnection(connection_cache_key,
+ common_properties.password,
+ common_properties.private_key_path,
+ common_properties.private_key_passphrase,
+ common_properties.proxy_password);
if (client == nullptr) {
- client = std::unique_ptr<utils::SFTPClient>(new utils::SFTPClient(hostname, port, username));
- if (!IsNullOrEmpty(host_key_file_)) {
- if (!client->setHostKeyFile(host_key_file_, strict_host_checking_)) {
- logger_->log_error("Cannot set host key file");
- context->yield();
- return false;
- }
- }
- if (!IsNullOrEmpty(password)) {
- client->setPasswordAuthenticationCredentials(password);
- }
- if (!IsNullOrEmpty(private_key_path)) {
- client->setPublicKeyAuthenticationCredentials(private_key_path, private_key_passphrase);
- }
- if (proxy_type_ != PROXY_TYPE_DIRECT) {
- utils::HTTPProxy proxy;
- proxy.host = proxy_host;
- proxy.port = proxy_port;
- proxy.username = proxy_username;
- proxy.password = proxy_password;
- if (!client->setProxy(
- proxy_type_ == PROXY_TYPE_HTTP ? utils::SFTPClient::ProxyType::Http : utils::SFTPClient::ProxyType::Socks,
- proxy)) {
- logger_->log_error("Cannot set proxy");
- context->yield();
- return false;
- }
- }
- if (!client->setConnectionTimeout(connection_timeout_)) {
- logger_->log_error("Cannot set connection timeout");
- context->yield();
- return false;
- }
- client->setDataTimeout(data_timeout_);
- client->setSendKeepAlive(use_keepalive_on_timeout_);
- if (!client->setUseCompression(use_compression_)) {
- logger_->log_error("Cannot set compression");
- context->yield();
- return false;
- }
-
- /* Connect to SFTP server */
- if (!client->connect()) {
- logger_->log_error("Cannot connect to SFTP server");
- context->yield();
- return false;
- }
+ context->yield();
+ return false;
}
/*
@@ -638,13 +316,10 @@
/* Try to detect conflicts if needed */
std::string resolved_filename = filename;
if (conflict_resolution_ != CONFLICT_RESOLUTION_NONE) {
- std::stringstream target_path_ss;
- target_path_ss << remote_path << "/" << filename;
- auto target_path = target_path_ss.str();
+ std::string target_path = utils::file::FileUtils::concat_path(remote_path, filename, true /*force_posix*/);
LIBSSH2_SFTP_ATTRIBUTES attrs;
- bool file_not_exists;
- if (!client->stat(target_path, true /*follow_symlinks*/, attrs, file_not_exists)) {
- if (!file_not_exists) {
+ if (!client->stat(target_path, true /*follow_symlinks*/, attrs)) {
+ if (client->getLastError() != utils::SFTPError::SFTP_ERROR_FILE_NOT_EXISTS) {
logger_->log_error("Failed to stat %s", target_path.c_str());
session->transfer(flow_file, Failure);
return true;
@@ -679,10 +354,9 @@
std::stringstream possible_resolved_filename_ss;
possible_resolved_filename_ss << i << "." << filename;
possible_resolved_filename = possible_resolved_filename_ss.str();
- auto possible_resolved_path = remote_path + "/" + possible_resolved_filename;
- bool file_not_exists;
- if (!client->stat(possible_resolved_path, true /*follow_symlinks*/, attrs, file_not_exists)) {
- if (file_not_exists) {
+ std::string possible_resolved_path = utils::file::FileUtils::concat_path(remote_path, possible_resolved_filename, true /*force_posix*/);
+ if (!client->stat(possible_resolved_path, true /*follow_symlinks*/, attrs)) {
+ if (client->getLastError() == utils::SFTPError::SFTP_ERROR_FILE_NOT_EXISTS) {
unique_name_generated = true;
break;
} else {
@@ -707,50 +381,26 @@
/* Create remote directory if needed */
if (create_directory_) {
- bool should_create_directory = disable_directory_listing;
- if (!disable_directory_listing) {
- LIBSSH2_SFTP_ATTRIBUTES attrs;
- bool file_not_exists;
- if (!client->stat(remote_path, true /*follow_symlinks*/, attrs, file_not_exists)) {
- if (!file_not_exists) {
- logger_->log_error("Failed to stat %s", remote_path.c_str());
- }
- should_create_directory = true;
- } else {
- if (attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS && !LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
- logger_->log_error("Remote path %s is not a directory", remote_path.c_str());
- session->transfer(flow_file, Failure);
- put_connection_back_to_cache();
- return true;
- }
- logger_->log_debug("Found remote directory %s", remote_path.c_str());
- }
- }
- if (should_create_directory) {
- (void) client->createDirectoryHierarchy(remote_path);
- if (!disable_directory_listing) {
- LIBSSH2_SFTP_ATTRIBUTES attrs;
- bool file_not_exists;
- if (!client->stat(remote_path, true /*follow_symlinks*/, attrs, file_not_exists)) {
- if (file_not_exists) {
- logger_->log_error("Could not find remote directory %s after creating it", remote_path.c_str());
- session->transfer(flow_file, Failure);
- put_connection_back_to_cache();
- return true;
- } else {
- logger_->log_error("Failed to stat %s", remote_path.c_str());
- context->yield();
- return false;
- }
- } else {
- if ((attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS) && !LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
- logger_->log_error("Remote path %s is not a directory", remote_path.c_str());
- session->transfer(flow_file, Failure);
- put_connection_back_to_cache();
- return true;
- }
- }
- }
+ auto res = createDirectoryHierarchy(*client, remote_path, disable_directory_listing);
+ switch (res) {
+ case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_OK:
+ break;
+ case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_STAT_FAILED:
+ context->yield();
+ return false;
+ case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_A_DIRECTORY:
+ session->transfer(flow_file, Failure);
+ put_connection_back_to_cache();
+ return true;
+ case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_FOUND:
+ case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_PERMISSION_DENIED:
+ session->transfer(flow_file, Failure);
+ put_connection_back_to_cache();
+ return true;
+ default:
+ logger_->log_error("Unknown createDirectoryHierarchy result: %hhu", static_cast<uint8_t>(res));
+ context->yield();
+ return false;
}
}
@@ -765,15 +415,13 @@
target_path_ss << resolved_filename;
}
auto target_path = target_path_ss.str();
- std::stringstream final_target_path_ss;
- final_target_path_ss << remote_path << "/" << resolved_filename;
- auto final_target_path = final_target_path_ss.str();
+ std::string final_target_path = utils::file::FileUtils::concat_path(remote_path, resolved_filename, true /*force_posix*/);
logger_->log_debug("The target path is %s, final target path is %s", target_path.c_str(), final_target_path.c_str());
ReadCallback read_callback(target_path.c_str(), *client, conflict_resolution_);
- session->read(flow_file, &read_callback);
-
- if (!read_callback.commit()) {
+ try {
+ session->read(flow_file, &read_callback);
+ } catch (const utils::SFTPError&) {
session->transfer(flow_file, Failure);
return true;
}
diff --git a/extensions/sftp/processors/PutSFTP.h b/extensions/sftp/processors/PutSFTP.h
index d04fbf8..3d2a115 100644
--- a/extensions/sftp/processors/PutSFTP.h
+++ b/extensions/sftp/processors/PutSFTP.h
@@ -26,6 +26,7 @@
#include <mutex>
#include <thread>
+#include "SFTPProcessorBase.h"
#include "utils/ByteArrayCallback.h"
#include "FlowFileRecord.h"
#include "core/Processor.h"
@@ -44,7 +45,7 @@
namespace minifi {
namespace processors {
-class PutSFTP : public core::Processor {
+ class PutSFTP : public SFTPProcessorBase {
public:
static constexpr char const *CONFLICT_RESOLUTION_REPLACE = "REPLACE";
@@ -54,10 +55,6 @@
static constexpr char const *CONFLICT_RESOLUTION_FAIL = "FAIL";
static constexpr char const *CONFLICT_RESOLUTION_NONE = "NONE";
- static constexpr char const *PROXY_TYPE_DIRECT = "DIRECT";
- static constexpr char const *PROXY_TYPE_HTTP = "HTTP";
- static constexpr char const *PROXY_TYPE_SOCKS = "SOCKS";
-
static constexpr char const* ProcessorName = "PutSFTP";
@@ -68,35 +65,19 @@
virtual ~PutSFTP();
// Supported Properties
- static core::Property Hostname;
- static core::Property Port;
- static core::Property Username;
- static core::Property Password;
- static core::Property PrivateKeyPath;
- static core::Property PrivateKeyPassphrase;
static core::Property RemotePath;
static core::Property CreateDirectory;
static core::Property DisableDirectoryListing;
static core::Property BatchSize;
- static core::Property ConnectionTimeout;
- static core::Property DataTimeout;
static core::Property ConflictResolution;
static core::Property RejectZeroByte;
static core::Property DotRename;
static core::Property TempFilename;
- static core::Property HostKeyFile;
static core::Property LastModifiedTime;
static core::Property Permissions;
static core::Property RemoteOwner;
static core::Property RemoteGroup;
- static core::Property StrictHostKeyChecking;
- static core::Property UseKeepaliveOnTimeout;
static core::Property UseCompression;
- static core::Property ProxyType;
- static core::Property ProxyHost;
- static core::Property ProxyPort;
- static core::Property HttpProxyUsername;
- static core::Property HttpProxyPassword;
// Supported Relationships
static core::Relationship Success;
@@ -110,7 +91,6 @@
virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
virtual void initialize() override;
virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
- virtual void notifyStop() override;
class ReadCallback : public InputStreamCallback {
public:
@@ -119,7 +99,6 @@
const std::string& conflict_resolution);
~ReadCallback();
virtual int64_t process(std::shared_ptr<io::BaseStream> stream) override;
- bool commit();
private:
std::shared_ptr<logging::Logger> logger_;
@@ -131,43 +110,11 @@
private:
- std::shared_ptr<logging::Logger> logger_;
-
bool create_directory_;
uint64_t batch_size_;
- int64_t connection_timeout_;
- int64_t data_timeout_;
std::string conflict_resolution_;
bool reject_zero_byte_;
bool dot_rename_;
- std::string host_key_file_;
- bool strict_host_checking_;
- bool use_keepalive_on_timeout_;
- bool use_compression_;
- std::string proxy_type_;
-
- static constexpr size_t CONNECTION_CACHE_MAX_SIZE = 8U;
- struct ConnectionCacheKey {
- std::string hostname;
- uint16_t port;
- std::string username;
- std::string proxy_type;
- std::string proxy_host;
- uint16_t proxy_port;
-
- bool operator<(const ConnectionCacheKey& other) const;
- bool operator==(const ConnectionCacheKey& other) const;
- };
- std::mutex connections_mutex_;
- std::map<ConnectionCacheKey, std::unique_ptr<utils::SFTPClient>> connections_;
- std::list<ConnectionCacheKey> lru_;
- std::unique_ptr<utils::SFTPClient> getConnectionFromCache(const ConnectionCacheKey& key);
- void addConnectionToCache(const ConnectionCacheKey& key, std::unique_ptr<utils::SFTPClient>&& connection);
-
- std::thread keepalive_thread_;
- bool running_;
- std::condition_variable keepalive_cv_;
- void keepaliveThreadFunc();
bool processOne(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
};
diff --git a/extensions/sftp/processors/SFTPProcessorBase.cpp b/extensions/sftp/processors/SFTPProcessorBase.cpp
new file mode 100644
index 0000000..63c335a
--- /dev/null
+++ b/extensions/sftp/processors/SFTPProcessorBase.cpp
@@ -0,0 +1,475 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SFTPProcessorBase.h"
+
+#include <memory>
+#include <algorithm>
+#include <cctype>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <iterator>
+#include <limits>
+#include <map>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "utils/ByteArrayCallback.h"
+#include "core/FlowFile.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/Relationship.h"
+#include "io/DataStream.h"
+#include "io/StreamFactory.h"
+#include "ResourceClaim.h"
+#include "utils/StringUtils.h"
+#include "utils/ScopeGuard.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property SFTPProcessorBase::Hostname(
+ core::PropertyBuilder::createProperty("Hostname")->withDescription("The fully qualified hostname or IP address of the remote system")
+ ->isRequired(true)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::Port(
+ core::PropertyBuilder::createProperty("Port")->withDescription("The port that the remote system is listening on for file transfers")
+ ->isRequired(true)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::Username(
+ core::PropertyBuilder::createProperty("Username")->withDescription("Username")
+ ->isRequired(true)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::Password(
+ core::PropertyBuilder::createProperty("Password")->withDescription("Password for the user account")
+ ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::PrivateKeyPath(
+ core::PropertyBuilder::createProperty("Private Key Path")->withDescription("The fully qualified path to the Private Key file")
+ ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::PrivateKeyPassphrase(
+ core::PropertyBuilder::createProperty("Private Key Passphrase")->withDescription("Password for the private key")
+ ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::StrictHostKeyChecking(
+ core::PropertyBuilder::createProperty("Strict Host Key Checking")->withDescription("Indicates whether or not strict enforcement of hosts keys should be applied")
+ ->isRequired(true)->withDefaultValue<bool>(false)->build());
+core::Property SFTPProcessorBase::HostKeyFile(
+ core::PropertyBuilder::createProperty("Host Key File")->withDescription("If supplied, the given file will be used as the Host Key; otherwise, no use host key file will be used")
+ ->isRequired(false)->build());
+core::Property SFTPProcessorBase::ConnectionTimeout(
+ core::PropertyBuilder::createProperty("Connection Timeout")->withDescription("Amount of time to wait before timing out while creating a connection")
+ ->isRequired(true)->withDefaultValue<core::TimePeriodValue>("30 sec")->build());
+core::Property SFTPProcessorBase::DataTimeout(
+ core::PropertyBuilder::createProperty("Data Timeout")->withDescription("When transferring a file between the local and remote system, this value specifies how long is allowed to elapse without any data being transferred between systems")
+ ->isRequired(true)->withDefaultValue<core::TimePeriodValue>("30 sec")->build());
+core::Property SFTPProcessorBase::SendKeepaliveOnTimeout(
+ core::PropertyBuilder::createProperty("Send Keep Alive On Timeout")->withDescription("Indicates whether or not to send a single Keep Alive message when SSH socket times out")
+ ->isRequired(true)->withDefaultValue<bool>(true)->build());
+core::Property SFTPProcessorBase::ProxyType(
+ core::PropertyBuilder::createProperty("Proxy Type")->withDescription("Specifies the Proxy Configuration Controller Service to proxy network requests. If set, it supersedes proxy settings configured per component. "
+ "Supported proxies: HTTP + AuthN, SOCKS + AuthN")
+ ->isRequired(false)
+ ->withAllowableValues<std::string>({PROXY_TYPE_DIRECT,
+ PROXY_TYPE_HTTP,
+ PROXY_TYPE_SOCKS})
+ ->withDefaultValue(PROXY_TYPE_DIRECT)->build());
+core::Property SFTPProcessorBase::ProxyHost(
+ core::PropertyBuilder::createProperty("Proxy Host")->withDescription("The fully qualified hostname or IP address of the proxy server")
+ ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::ProxyPort(
+ core::PropertyBuilder::createProperty("Proxy Port")->withDescription("The port of the proxy server")
+ ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::HttpProxyUsername(
+ core::PropertyBuilder::createProperty("Http Proxy Username")->withDescription("Http Proxy Username")
+ ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::HttpProxyPassword(
+ core::PropertyBuilder::createProperty("Http Proxy Password")->withDescription("Http Proxy Password")
+ ->isRequired(false)->supportsExpressionLanguage(true)->build());
+
+constexpr size_t SFTPProcessorBase::CONNECTION_CACHE_MAX_SIZE;
+
+SFTPProcessorBase::SFTPProcessorBase(std::string name, utils::Identifier uuid)
+ : Processor(name, uuid),
+ connection_timeout_(0),
+ data_timeout_(0),
+ strict_host_checking_(false),
+ use_keepalive_on_timeout_(false),
+ use_compression_(false),
+ running_(true) {
+}
+
+SFTPProcessorBase::~SFTPProcessorBase() {
+ if (keepalive_thread_.joinable()) {
+ {
+ std::lock_guard<std::mutex> lock(connections_mutex_);
+ running_ = false;
+ keepalive_cv_.notify_one();
+ }
+ keepalive_thread_.join();
+ }
+}
+
+void SFTPProcessorBase::notifyStop() {
+ logger_->log_debug("Got notifyStop, stopping keepalive thread and clearing connections");
+ cleanupConnectionCache();
+}
+
+void SFTPProcessorBase::addSupportedCommonProperties(std::set<core::Property>& supported_properties) {
+ supported_properties.insert(Hostname);
+ supported_properties.insert(Port);
+ supported_properties.insert(Username);
+ supported_properties.insert(Password);
+ supported_properties.insert(PrivateKeyPath);
+ supported_properties.insert(PrivateKeyPassphrase);
+ supported_properties.insert(StrictHostKeyChecking);
+ supported_properties.insert(HostKeyFile);
+ supported_properties.insert(ConnectionTimeout);
+ supported_properties.insert(DataTimeout);
+ supported_properties.insert(SendKeepaliveOnTimeout);
+ supported_properties.insert(ProxyType);
+ supported_properties.insert(ProxyHost);
+ supported_properties.insert(ProxyPort);
+ supported_properties.insert(HttpProxyUsername);
+ supported_properties.insert(HttpProxyPassword);
+}
+
+void SFTPProcessorBase::parseCommonPropertiesOnSchedule(const std::shared_ptr<core::ProcessContext>& context) {
+ std::string value;
+ if (!context->getProperty(StrictHostKeyChecking.getName(), value)) {
+ logger_->log_error("Strict Host Key Checking attribute is missing or invalid");
+ } else {
+ utils::StringUtils::StringToBool(value, strict_host_checking_);
+ }
+ context->getProperty(HostKeyFile.getName(), host_key_file_);
+ if (!context->getProperty(ConnectionTimeout.getName(), value)) {
+ logger_->log_error("Connection Timeout attribute is missing or invalid");
+ } else {
+ core::TimeUnit unit;
+ if (!core::Property::StringToTime(value, connection_timeout_, unit) || !core::Property::ConvertTimeUnitToMS(connection_timeout_, unit, connection_timeout_)) {
+ logger_->log_error("Connection Timeout attribute is invalid");
+ }
+ }
+ if (!context->getProperty(DataTimeout.getName(), value)) {
+ logger_->log_error("Data Timeout attribute is missing or invalid");
+ } else {
+ core::TimeUnit unit;
+ if (!core::Property::StringToTime(value, data_timeout_, unit) || !core::Property::ConvertTimeUnitToMS(data_timeout_, unit, data_timeout_)) {
+ logger_->log_error("Data Timeout attribute is invalid");
+ }
+ }
+ if (!context->getProperty(SendKeepaliveOnTimeout.getName(), value)) {
+ logger_->log_error("Send Keep Alive On Timeout attribute is missing or invalid");
+ } else {
+ utils::StringUtils::StringToBool(value, use_keepalive_on_timeout_);
+ }
+ context->getProperty(ProxyType.getName(), proxy_type_);
+}
+
+SFTPProcessorBase::CommonProperties::CommonProperties()
+ : port(0U)
+ , proxy_port(0U)
+{
+}
+
+bool SFTPProcessorBase::parseCommonPropertiesOnTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<FlowFileRecord>& flow_file, CommonProperties& common_properties) {
+ std::string value;
+ if (!context->getProperty(Hostname, common_properties.hostname, flow_file)) {
+ logger_->log_error("Hostname attribute is missing");
+ return false;
+ }
+ if (!context->getProperty(Port, value, flow_file)) {
+ logger_->log_error("Port attribute is missing or invalid");
+ return false;
+ } else {
+ int port_tmp;
+ if (!core::Property::StringToInt(value, port_tmp) ||
+ port_tmp <= std::numeric_limits<uint16_t>::min() ||
+ port_tmp > std::numeric_limits<uint16_t>::max()) {
+ logger_->log_error("Port attribute \"%s\" is invalid", value);
+ return false;
+ } else {
+ common_properties.port = static_cast<uint16_t>(port_tmp);
+ }
+ }
+ if (!context->getProperty(Username, common_properties.username, flow_file)) {
+ logger_->log_error("Username attribute is missing");
+ return false;
+ }
+ context->getProperty(Password, common_properties.password, flow_file);
+ context->getProperty(PrivateKeyPath, common_properties.private_key_path, flow_file);
+ context->getProperty(PrivateKeyPassphrase, common_properties.private_key_passphrase, flow_file);
+ context->getProperty(Password, common_properties.password, flow_file);
+ context->getProperty(ProxyHost, common_properties.proxy_host, flow_file);
+ if (context->getProperty(ProxyPort, value, flow_file) && !value.empty()) {
+ int port_tmp;
+ if (!core::Property::StringToInt(value, port_tmp) ||
+ port_tmp <= std::numeric_limits<uint16_t>::min() ||
+ port_tmp > std::numeric_limits<uint16_t>::max()) {
+ logger_->log_error("Proxy Port attribute \"%s\" is invalid", value);
+ return false;
+ } else {
+ common_properties.proxy_port = static_cast<uint16_t>(port_tmp);
+ }
+ }
+ context->getProperty(HttpProxyUsername, common_properties.proxy_username, flow_file);
+ context->getProperty(HttpProxyPassword, common_properties.proxy_password, flow_file);
+
+ return true;
+}
+
+bool SFTPProcessorBase::ConnectionCacheKey::operator<(const SFTPProcessorBase::ConnectionCacheKey& other) const {
+ return std::tie(hostname, port, username, proxy_type, proxy_host, proxy_port, proxy_username) <
+ std::tie(other.hostname, other.port, other.username, other.proxy_type, other.proxy_host, other.proxy_port, other.proxy_username);
+}
+
+bool SFTPProcessorBase::ConnectionCacheKey::operator==(const SFTPProcessorBase::ConnectionCacheKey& other) const {
+ return std::tie(hostname, port, username, proxy_type, proxy_host, proxy_port, proxy_username) ==
+ std::tie(other.hostname, other.port, other.username, other.proxy_type, other.proxy_host, other.proxy_port, other.proxy_username);
+}
+
+std::unique_ptr<utils::SFTPClient> SFTPProcessorBase::getConnectionFromCache(const SFTPProcessorBase::ConnectionCacheKey& key) {
+ std::lock_guard<std::mutex> lock(connections_mutex_);
+
+ auto it = connections_.find(key);
+ if (it == connections_.end()) {
+ return nullptr;
+ }
+
+ logger_->log_debug("Removing %s@%s:%hu from SFTP connection pool",
+ key.username,
+ key.hostname,
+ key.port);
+
+ auto lru_it = std::find(lru_.begin(), lru_.end(), key);
+ if (lru_it == lru_.end()) {
+ logger_->log_trace("Assertion error: can't find key in LRU cache");
+ } else {
+ lru_.erase(lru_it);
+ }
+
+ auto connection = std::move(it->second);
+ connections_.erase(it);
+ return connection;
+}
+
+void SFTPProcessorBase::addConnectionToCache(const SFTPProcessorBase::ConnectionCacheKey& key, std::unique_ptr<utils::SFTPClient>&& connection) {
+ std::lock_guard<std::mutex> lock(connections_mutex_);
+
+ while (connections_.size() >= SFTPProcessorBase::CONNECTION_CACHE_MAX_SIZE) {
+ const auto& lru_key = lru_.back();
+ logger_->log_debug("SFTP connection pool is full, removing %s@%s:%hu",
+ lru_key.username,
+ lru_key.hostname,
+ lru_key.port);
+ connections_.erase(lru_key);
+ lru_.pop_back();
+ }
+
+ logger_->log_debug("Adding %s@%s:%hu to SFTP connection pool",
+ key.username,
+ key.hostname,
+ key.port);
+ connections_.emplace(key, std::move(connection));
+ lru_.push_front(key);
+ keepalive_cv_.notify_one();
+}
+
+void SFTPProcessorBase::keepaliveThreadFunc() {
+ std::unique_lock<std::mutex> lock(connections_mutex_);
+
+ while (true) {
+ if (connections_.empty()) {
+ keepalive_cv_.wait(lock, [this] {
+ return !running_ || !connections_.empty();
+ });
+ }
+ if (!running_) {
+ logger_->log_trace("Stopping keepalive thread");
+ lock.unlock();
+ return;
+ }
+
+ int min_wait = 10;
+ for (auto &connection : connections_) {
+ int seconds_to_next = 0;
+ if (connection.second->sendKeepAliveIfNeeded(seconds_to_next)) {
+ logger_->log_debug("Sent keepalive to %s@%s:%hu if needed, next keepalive in %d s",
+ connection.first.username,
+ connection.first.hostname,
+ connection.first.port,
+ seconds_to_next);
+ if (seconds_to_next < min_wait) {
+ min_wait = seconds_to_next;
+ }
+ } else {
+ logger_->log_debug("Failed to send keepalive to %s@%s:%hu",
+ connection.first.username,
+ connection.first.hostname,
+ connection.first.port);
+ }
+ }
+
+ /* Avoid busy loops */
+ if (min_wait < 1) {
+ min_wait = 1;
+ }
+
+ logger_->log_trace("Keepalive thread is going to sleep for %d s", min_wait);
+ keepalive_cv_.wait_for(lock, std::chrono::seconds(min_wait), [this] {
+ return !running_;
+ });
+ if (!running_) {
+ lock.unlock();
+ return;
+ }
+ }
+}
+
+void SFTPProcessorBase::startKeepaliveThreadIfNeeded() {
+ if (use_keepalive_on_timeout_ && !keepalive_thread_.joinable()) {
+ running_ = true;
+ keepalive_thread_ = std::thread(&SFTPProcessorBase::keepaliveThreadFunc, this);
+ }
+}
+
+void SFTPProcessorBase::cleanupConnectionCache() {
+ if (keepalive_thread_.joinable()) {
+ {
+ std::lock_guard<std::mutex> lock(connections_mutex_);
+ running_ = false;
+ keepalive_cv_.notify_one();
+ }
+ keepalive_thread_.join();
+ }
+ /* The thread is no longer running, we don't have to lock */
+ connections_.clear();
+ lru_.clear();
+}
+
+std::unique_ptr<utils::SFTPClient> SFTPProcessorBase::getOrCreateConnection(
+ const SFTPProcessorBase::ConnectionCacheKey& connection_cache_key,
+ const std::string& password,
+ const std::string& private_key_path,
+ const std::string& private_key_passphrase,
+ const std::string& proxy_password) {
+ auto client = getConnectionFromCache(connection_cache_key);
+ if (client == nullptr) {
+ client = std::unique_ptr<utils::SFTPClient>(new utils::SFTPClient(connection_cache_key.hostname,
+ connection_cache_key.port,
+ connection_cache_key.username));
+ if (!IsNullOrEmpty(host_key_file_)) {
+ if (!client->setHostKeyFile(host_key_file_, strict_host_checking_)) {
+ logger_->log_error("Cannot set host key file");
+ return nullptr;
+ }
+ }
+ if (!IsNullOrEmpty(password)) {
+ client->setPasswordAuthenticationCredentials(password);
+ }
+ if (!IsNullOrEmpty(private_key_path)) {
+ client->setPublicKeyAuthenticationCredentials(private_key_path, private_key_passphrase);
+ }
+ if (connection_cache_key.proxy_type != PROXY_TYPE_DIRECT) {
+ utils::HTTPProxy proxy;
+ proxy.host = connection_cache_key.proxy_host;
+ proxy.port = connection_cache_key.proxy_port;
+ proxy.username = connection_cache_key.proxy_username;
+ proxy.password = proxy_password;
+ if (!client->setProxy(
+ connection_cache_key.proxy_type == PROXY_TYPE_HTTP ? utils::SFTPClient::ProxyType::Http : utils::SFTPClient::ProxyType::Socks,
+ proxy)) {
+ logger_->log_error("Cannot set proxy");
+ return nullptr;
+ }
+ }
+ if (!client->setConnectionTimeout(connection_timeout_)) {
+ logger_->log_error("Cannot set connection timeout");
+ return nullptr;
+ }
+ client->setDataTimeout(data_timeout_);
+ client->setSendKeepAlive(use_keepalive_on_timeout_);
+ if (!client->setUseCompression(use_compression_)) {
+ logger_->log_error("Cannot set compression");
+ return nullptr;
+ }
+
+ /* Connect to SFTP server */
+ if (!client->connect()) {
+ logger_->log_error("Cannot connect to SFTP server");
+ return nullptr;
+ }
+ }
+
+ return client;
+}
+
+SFTPProcessorBase::CreateDirectoryHierarchyError SFTPProcessorBase::createDirectoryHierarchy(
+ utils::SFTPClient& client,
+ const std::string& remote_path,
+ bool disable_directory_listing) {
+ bool should_create_directory = disable_directory_listing;
+ if (!disable_directory_listing) {
+ LIBSSH2_SFTP_ATTRIBUTES attrs;
+ if (!client.stat(remote_path, true /*follow_symlinks*/, attrs)) {
+ if (client.getLastError() != utils::SFTPError::SFTP_ERROR_FILE_NOT_EXISTS) {
+ logger_->log_error("Failed to stat %s", remote_path.c_str());
+ }
+ should_create_directory = true;
+ } else {
+ if (attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS && !LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
+ logger_->log_error("Remote path %s is not a directory", remote_path.c_str());
+ return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_A_DIRECTORY;
+ }
+ logger_->log_debug("Found remote directory %s", remote_path.c_str());
+ }
+ }
+ if (should_create_directory) {
+ (void) client.createDirectoryHierarchy(remote_path);
+ if (!disable_directory_listing) {
+ LIBSSH2_SFTP_ATTRIBUTES attrs;
+ if (!client.stat(remote_path, true /*follow_symlinks*/, attrs)) {
+ auto last_error = client.getLastError();
+ if (last_error == utils::SFTPError::SFTP_ERROR_FILE_NOT_EXISTS) {
+ logger_->log_error("Could not find remote directory %s after creating it", remote_path.c_str());
+ return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_FOUND;
+ } else if (last_error == utils::SFTPError::SFTP_ERROR_PERMISSION_DENIED) {
+ logger_->log_error("Permission denied when reading remote directory %s after creating it", remote_path.c_str());
+ return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_PERMISSION_DENIED;
+ } else {
+ logger_->log_error("Failed to stat %s", remote_path.c_str());
+ return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_STAT_FAILED;
+ }
+ } else {
+ if ((attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS) && !LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
+ logger_->log_error("Remote path %s is not a directory", remote_path.c_str());
+ return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_A_DIRECTORY;
+ }
+ }
+ }
+ }
+
+ return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_OK;
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sftp/processors/SFTPProcessorBase.h b/extensions/sftp/processors/SFTPProcessorBase.h
new file mode 100644
index 0000000..4d83fff
--- /dev/null
+++ b/extensions/sftp/processors/SFTPProcessorBase.h
@@ -0,0 +1,156 @@
+/**
+ * PutSFTP class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __SFTP_PROCESSOR_BASE_H__
+#define __SFTP_PROCESSOR_BASE_H__
+
+#include <memory>
+#include <string>
+#include <list>
+#include <map>
+#include <mutex>
+#include <thread>
+
+#include "utils/ByteArrayCallback.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+#include "core/Resource.h"
+#include "controllers/SSLContextService.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+#include "../client/SFTPClient.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class SFTPProcessorBase : public core::Processor {
+ public:
+ SFTPProcessorBase(std::string name, utils::Identifier uuid);
+ virtual ~SFTPProcessorBase();
+
+ // Supported Properties
+ static core::Property Hostname;
+ static core::Property Port;
+ static core::Property Username;
+ static core::Property Password;
+ static core::Property PrivateKeyPath;
+ static core::Property PrivateKeyPassphrase;
+ static core::Property StrictHostKeyChecking;
+ static core::Property HostKeyFile;
+ static core::Property ConnectionTimeout;
+ static core::Property DataTimeout;
+ static core::Property SendKeepaliveOnTimeout;
+ static core::Property TargetSystemTimestampPrecision;
+ static core::Property ProxyType;
+ static core::Property ProxyHost;
+ static core::Property ProxyPort;
+ static core::Property HttpProxyUsername;
+ static core::Property HttpProxyPassword;
+
+ static constexpr char const *PROXY_TYPE_DIRECT = "DIRECT";
+ static constexpr char const *PROXY_TYPE_HTTP = "HTTP";
+ static constexpr char const *PROXY_TYPE_SOCKS = "SOCKS";
+
+ virtual void notifyStop() override;
+
+ protected:
+ std::shared_ptr<logging::Logger> logger_;
+
+ int64_t connection_timeout_;
+ int64_t data_timeout_;
+ std::string host_key_file_;
+ bool strict_host_checking_;
+ bool use_keepalive_on_timeout_;
+ bool use_compression_;
+ std::string proxy_type_;
+
+ void addSupportedCommonProperties(std::set<core::Property>& supported_properties);
+ void parseCommonPropertiesOnSchedule(const std::shared_ptr<core::ProcessContext>& context);
+ struct CommonProperties {
+ std::string hostname;
+ uint16_t port;
+ std::string username;
+ std::string password;
+ std::string private_key_path;
+ std::string private_key_passphrase;
+ std::string remote_path;
+ std::string proxy_host;
+ uint16_t proxy_port;
+ std::string proxy_username;
+ std::string proxy_password;
+
+ CommonProperties();
+ };
+ bool parseCommonPropertiesOnTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<FlowFileRecord>& flow_file, CommonProperties& common_properties);
+
+ static constexpr size_t CONNECTION_CACHE_MAX_SIZE = 8U;
+ struct ConnectionCacheKey {
+ std::string hostname;
+ uint16_t port;
+ std::string username;
+ std::string proxy_type;
+ std::string proxy_host;
+ uint16_t proxy_port;
+ std::string proxy_username;
+
+ bool operator<(const ConnectionCacheKey& other) const;
+ bool operator==(const ConnectionCacheKey& other) const;
+ };
+ std::mutex connections_mutex_;
+ std::map<ConnectionCacheKey, std::unique_ptr<utils::SFTPClient>> connections_;
+ std::list<ConnectionCacheKey> lru_;
+ std::unique_ptr<utils::SFTPClient> getConnectionFromCache(const ConnectionCacheKey& key);
+ void addConnectionToCache(const ConnectionCacheKey& key, std::unique_ptr<utils::SFTPClient>&& connection);
+
+ std::thread keepalive_thread_;
+ bool running_;
+ std::condition_variable keepalive_cv_;
+ void keepaliveThreadFunc();
+
+ void startKeepaliveThreadIfNeeded();
+ void cleanupConnectionCache();
+ std::unique_ptr<utils::SFTPClient> getOrCreateConnection(
+ const SFTPProcessorBase::ConnectionCacheKey& connection_cache_key,
+ const std::string& password,
+ const std::string& private_key_path,
+ const std::string& private_key_passphrase,
+ const std::string& proxy_password);
+
+ enum class CreateDirectoryHierarchyError : uint8_t {
+ CREATE_DIRECTORY_HIERARCHY_ERROR_OK = 0,
+ CREATE_DIRECTORY_HIERARCHY_ERROR_STAT_FAILED,
+ CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_A_DIRECTORY,
+ CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_FOUND,
+ CREATE_DIRECTORY_HIERARCHY_ERROR_PERMISSION_DENIED,
+ };
+ CreateDirectoryHierarchyError createDirectoryHierarchy(utils::SFTPClient& client, const std::string& remote_path, bool disable_directory_listing);
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
diff --git a/extensions/sftp/tests/CMakeLists.txt b/extensions/sftp/tests/CMakeLists.txt
index 998d1ed..f8663f7 100644
--- a/extensions/sftp/tests/CMakeLists.txt
+++ b/extensions/sftp/tests/CMakeLists.txt
@@ -61,4 +61,4 @@
add_subdirectory(tools)
else()
message("Could find Java and Maven to build SFTPTestServer, disabling SFTP tests")
-endif()
\ No newline at end of file
+endif()
diff --git a/extensions/sftp/tests/FetchSFTPTests.cpp b/extensions/sftp/tests/FetchSFTPTests.cpp
new file mode 100644
index 0000000..d7e95c7
--- /dev/null
+++ b/extensions/sftp/tests/FetchSFTPTests.cpp
@@ -0,0 +1,425 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <cstring>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include <algorithm>
+#include <functional>
+#include <iterator>
+#include <random>
+#ifndef WIN32
+#include <unistd.h>
+#endif
+
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "utils/file/FileUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "processors/FetchSFTP.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/LogAttribute.h"
+#include "processors/UpdateAttribute.h"
+#include "processors/PutFile.h"
+#include "tools/SFTPTestServer.h"
+
+class FetchSFTPTestsFixture {
+ public:
+ FetchSFTPTestsFixture()
+ : src_dir(strdup("/tmp/sftps.XXXXXX"))
+ , dst_dir(strdup("/tmp/sftpd.XXXXXX")) {
+ LogTestController::getInstance().setTrace<TestPlan>();
+ LogTestController::getInstance().setDebug<minifi::FlowController>();
+ LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
+ LogTestController::getInstance().setDebug<minifi::core::ProcessGroup>();
+ LogTestController::getInstance().setDebug<minifi::core::Processor>();
+ LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+ LogTestController::getInstance().setDebug<processors::GenerateFlowFile>();
+ LogTestController::getInstance().setTrace<minifi::utils::SFTPClient>();
+ LogTestController::getInstance().setTrace<processors::FetchSFTP>();
+ LogTestController::getInstance().setTrace<processors::PutFile>();
+ LogTestController::getInstance().setDebug<processors::LogAttribute>();
+ LogTestController::getInstance().setDebug<SFTPTestServer>();
+
+ // Create temporary directories
+ testController.createTempDirectory(src_dir);
+ REQUIRE(src_dir != nullptr);
+ testController.createTempDirectory(dst_dir);
+ REQUIRE(dst_dir != nullptr);
+
+ // Start SFTP server
+ sftp_server = std::unique_ptr<SFTPTestServer>(new SFTPTestServer(src_dir));
+ REQUIRE(true == sftp_server->start());
+
+ // Build MiNiFi processing graph
+ plan = testController.createPlan();
+ generate_flow_file = plan->addProcessor(
+ "GenerateFlowFile",
+ "GenerateFlowFile");
+ update_attribute = plan->addProcessor("UpdateAttribute",
+ "UpdateAttribute",
+ core::Relationship("success", "d"),
+ true);
+ fetch_sftp = plan->addProcessor(
+ "FetchSFTP",
+ "FetchSFTP",
+ core::Relationship("success", "d"),
+ true);
+ plan->addProcessor("LogAttribute",
+ "LogAttribute",
+ { core::Relationship("success", "d"),
+ core::Relationship("comms.failure", "d"),
+ core::Relationship("not.found", "d"),
+ core::Relationship("permission.denied", "d") },
+ true);
+ put_file = plan->addProcessor("PutFile",
+ "PutFile",
+ core::Relationship("success", "d"),
+ true);
+
+ // Configure GenerateFlowFile processor
+ plan->setProperty(generate_flow_file, "File Size", "1B");
+
+ // Configure FetchSFTP processor
+ plan->setProperty(fetch_sftp, "Hostname", "localhost");
+ plan->setProperty(fetch_sftp, "Port", std::to_string(sftp_server->getPort()));
+ plan->setProperty(fetch_sftp, "Username", "nifiuser");
+ plan->setProperty(fetch_sftp, "Password", "nifipassword");
+ plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_NONE);
+ plan->setProperty(fetch_sftp, "Connection Timeout", "30 sec");
+ plan->setProperty(fetch_sftp, "Data Timeout", "30 sec");
+ plan->setProperty(fetch_sftp, "Strict Host Key Checking", "false");
+ plan->setProperty(fetch_sftp, "Send Keep Alive On Timeout", "true");
+ plan->setProperty(fetch_sftp, "Use Compression", "false");
+
+ // Configure PutFile processor
+ plan->setProperty(put_file, "Directory", std::string(dst_dir) + "/${path}");
+ plan->setProperty(put_file, "Conflict Resolution Strategy", processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL);
+ plan->setProperty(put_file, "Create Missing Directories", "true");
+ }
+
+ virtual ~FetchSFTPTestsFixture() {
+ free(src_dir);
+ free(dst_dir);
+ LogTestController::getInstance().reset();
+ }
+
+ // Create source file
+ void createFile(const std::string& relative_path, const std::string& content) {
+ std::fstream file;
+ std::stringstream ss;
+ ss << src_dir << "/vfs/" << relative_path;
+ utils::file::FileUtils::create_dir(utils::file::FileUtils::get_parent_path(ss.str())); // TODO
+ file.open(ss.str(), std::ios::out);
+ file << content;
+ file.close();
+ }
+
+ enum TestWhere {
+ IN_DESTINATION,
+ IN_SOURCE
+ };
+
+ void testFile(TestWhere where, const std::string& relative_path, const std::string& expected_content) {
+ std::stringstream resultFile;
+ if (where == IN_DESTINATION) {
+ resultFile << dst_dir << "/" << relative_path;
+ } else {
+ resultFile << src_dir << "/vfs/" << relative_path;
+#ifndef WIN32
+ /* Workaround for mina-sshd setting the read file's permissions to 0000 */
+ REQUIRE(0 == chmod(resultFile.str().c_str(), 0644));
+#endif
+ }
+ std::ifstream file(resultFile.str());
+ REQUIRE(true == file.good());
+ std::stringstream content;
+ std::vector<char> buffer(1024U);
+ while (file) {
+ file.read(buffer.data(), buffer.size());
+ content << std::string(buffer.data(), file.gcount());
+ }
+ REQUIRE(expected_content == content.str());
+ }
+
+ void testFileNotExists(TestWhere where, const std::string& relative_path) {
+ std::stringstream resultFile;
+ if (where == IN_DESTINATION) {
+ resultFile << dst_dir << "/" << relative_path;
+ } else {
+ resultFile << src_dir << "/vfs/" << relative_path;
+#ifndef WIN32
+ /* Workaround for mina-sshd setting the read file's permissions to 0000 */
+ REQUIRE(-1 == chmod(resultFile.str().c_str(), 0644));
+#endif
+ }
+ std::ifstream file(resultFile.str());
+ REQUIRE(false == file.is_open());
+ REQUIRE(false == file.good());
+ }
+
+ protected:
+ char *src_dir;
+ char *dst_dir;
+ std::unique_ptr<SFTPTestServer> sftp_server;
+ TestController testController;
+ std::shared_ptr<TestPlan> plan;
+ std::shared_ptr<core::Processor> generate_flow_file;
+ std::shared_ptr<core::Processor> update_attribute;
+ std::shared_ptr<core::Processor> fetch_sftp;
+ std::shared_ptr<core::Processor> put_file;
+};
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP fetch one file", "[FetchSFTP][basic]") {
+ plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+
+ createFile("nifi_test/tstFile.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ testFile(IN_SOURCE, "nifi_test/tstFile.ext", "Test content 1");
+ testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+
+ REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+ REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP public key authentication", "[FetchSFTP][basic]") {
+ plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+ plan->setProperty(fetch_sftp, "Private Key Path", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/id_rsa"));
+ plan->setProperty(fetch_sftp, "Private Key Passphrase", "privatekeypassword");
+
+ createFile("nifi_test/tstFile.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ testFile(IN_SOURCE, "nifi_test/tstFile.ext", "Test content 1");
+ testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+
+ REQUIRE(LogTestController::getInstance().contains("Successfully authenticated with publickey"));
+
+ REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+ REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP fetch non-existing file", "[FetchSFTP][basic]") {
+ plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("Failed to open remote file \"nifi_test/tstFile.ext\", error: LIBSSH2_FX_NO_SUCH_FILE"));
+ REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship not.found"));
+}
+
+#ifndef WIN32
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP fetch non-readable file", "[FetchSFTP][basic]") {
+ if (getuid() == 0) {
+ std::cerr << "!!!! This test does NOT work as root. Exiting. !!!!" << std::endl;
+ return;
+ }
+
+ plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+
+ createFile("nifi_test/tstFile.ext", "Test content 1");
+ REQUIRE(0 == chmod((std::string(src_dir) + "/vfs/nifi_test/tstFile.ext").c_str(), 0000));
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("Failed to open remote file \"nifi_test/tstFile.ext\", error: LIBSSH2_FX_PERMISSION_DENIED"));
+ REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship permission.denied"));
+}
+#endif
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP fetch connection error", "[FetchSFTP][basic]") {
+ plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+
+ createFile("nifi_test/tstFile.ext", "Test content 1");
+
+ /* Run it once normally to open the connection */
+ testController.runSession(plan, true);
+ plan->reset();
+
+ /* Stop the server to create a connection error */
+ sftp_server.reset();
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("Failed to open remote file \"nifi_test/tstFile.ext\" due to an underlying SSH error"));
+ REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship comms.failure"));
+}
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP Completion Strategy Delete File success", "[FetchSFTP][completion-strategy]") {
+ plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+ plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_DELETE_FILE);
+
+ createFile("nifi_test/tstFile.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ testFileNotExists(IN_SOURCE, "nifi_test/tstFile.ext");
+ testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+ REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+
+#ifndef WIN32
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP Completion Strategy Delete File fail", "[FetchSFTP][completion-strategy]") {
+ if (getuid() == 0) {
+ std::cerr << "!!!! This test does NOT work as root. Exiting. !!!!" << std::endl;
+ return;
+ }
+ plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+ plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_DELETE_FILE);
+
+ createFile("nifi_test/tstFile.ext", "Test content 1");
+ /* By making the parent directory non-writable we make it impossible do delete the source file */
+ REQUIRE(0 == chmod((std::string(src_dir) + "/vfs/nifi_test").c_str(), 0500));
+
+ testController.runSession(plan, true);
+
+ /* We should succeed even if the completion strategy fails */
+ testFile(IN_SOURCE, "nifi_test/tstFile.ext", "Test content 1");
+ testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+
+ REQUIRE(LogTestController::getInstance().contains("Failed to remove remote file \"nifi_test/tstFile.ext\", error: LIBSSH2_FX_PERMISSION_DENIED"));
+ REQUIRE(LogTestController::getInstance().contains("Completion Strategy is Delete File, but failed to delete remote file \"nifi_test/tstFile.ext\""));
+
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+ REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+#endif
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP Completion Strategy Move File success", "[FetchSFTP][completion-strategy]") {
+ plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+ plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_MOVE_FILE);
+ plan->setProperty(fetch_sftp, "Move Destination Directory", "nifi_done/");
+ plan->setProperty(fetch_sftp, "Create Directory", "true");
+
+ createFile("nifi_test/tstFile.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ testFileNotExists(IN_SOURCE, "nifi_test/tstFile.ext");
+ testFile(IN_SOURCE, "nifi_done/tstFile.ext", "Test content 1");
+ testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+ REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP Completion Strategy Move File fail", "[FetchSFTP][completion-strategy]") {
+ plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+ plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_MOVE_FILE);
+ plan->setProperty(fetch_sftp, "Move Destination Directory", "nifi_done/");
+
+ /* The completion strategy should fail because the target directory does not exist and we don't create it */
+ plan->setProperty(fetch_sftp, "Create Directory", "false");
+
+ createFile("nifi_test/tstFile.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ /* We should succeed even if the completion strategy fails */
+ testFileNotExists(IN_SOURCE, "nifi_done/tstFile.ext");
+ testFile(IN_SOURCE, "nifi_test/tstFile.ext", "Test content 1");
+ testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+
+ REQUIRE(LogTestController::getInstance().contains("Failed to rename remote file \"nifi_test/tstFile.ext\" to \"nifi_done/tstFile.ext\", error: LIBSSH2_FX_NO_SUCH_FILE"));
+ REQUIRE(LogTestController::getInstance().contains("Completion Strategy is Move File, but failed to move file \"nifi_test/tstFile.ext\" to \"nifi_done/tstFile.ext\""));
+
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+ REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP expression language test", "[FetchSFTP]") {
+ plan->setProperty(update_attribute, "attr_Hostname", "localhost", true /*dynamic*/);
+ plan->setProperty(update_attribute, "attr_Port", std::to_string(sftp_server->getPort()), true /*dynamic*/);
+ plan->setProperty(update_attribute, "attr_Username", "nifiuser", true /*dynamic*/);
+ plan->setProperty(update_attribute, "attr_Password", "nifipassword", true /*dynamic*/);
+ plan->setProperty(update_attribute, "attr_Private Key Path",
+ utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/id_rsa"), true /*dynamic*/);
+ plan->setProperty(update_attribute, "attr_Private Key Passphrase", "privatekeypassword", true /*dynamic*/);
+ plan->setProperty(update_attribute, "attr_Remote File", "nifi_test/tstFile.ext", true /*dynamic*/);
+ plan->setProperty(update_attribute, "attr_Move Destination Directory", "nifi_done/", true /*dynamic*/);
+
+ plan->setProperty(fetch_sftp, "Hostname", "${'attr_Hostname'}");
+ plan->setProperty(fetch_sftp, "Port", "${'attr_Port'}");
+ plan->setProperty(fetch_sftp, "Username", "${'attr_Username'}");
+ plan->setProperty(fetch_sftp, "Password", "${'attr_Password'}");
+ plan->setProperty(fetch_sftp, "Private Key Path", "${'attr_Private Key Path'}");
+ plan->setProperty(fetch_sftp, "Private Key Passphrase", "${'attr_Private Key Passphrase'}");
+ plan->setProperty(fetch_sftp, "Remote File", "${'attr_Remote File'}");
+ plan->setProperty(fetch_sftp, "Move Destination Directory", "${'attr_Move Destination Directory'}");
+
+ plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_MOVE_FILE);
+ plan->setProperty(fetch_sftp, "Create Directory", "true");
+
+ createFile("nifi_test/tstFile.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ testFileNotExists(IN_SOURCE, "nifi_test/tstFile.ext");
+ testFile(IN_SOURCE, "nifi_done/tstFile.ext", "Test content 1");
+ testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+
+ REQUIRE(LogTestController::getInstance().contains("Successfully authenticated with publickey"));
+ REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+ REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
diff --git a/extensions/sftp/tests/ListSFTPTests.cpp b/extensions/sftp/tests/ListSFTPTests.cpp
new file mode 100644
index 0000000..f82b010
--- /dev/null
+++ b/extensions/sftp/tests/ListSFTPTests.cpp
@@ -0,0 +1,907 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <cstring>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include <algorithm>
+#include <functional>
+#include <iterator>
+#include <random>
+#ifndef WIN32
+#include <unistd.h>
+#endif
+
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "utils/file/FileUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "processors/ListSFTP.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/LogAttribute.h"
+#include "processors/UpdateAttribute.h"
+#include "tools/SFTPTestServer.h"
+
+class ListSFTPTestsFixture {
+ public:
+ ListSFTPTestsFixture()
+ : src_dir(strdup("/tmp/sftps.XXXXXX")) {
+ LogTestController::getInstance().setTrace<TestPlan>();
+ LogTestController::getInstance().setDebug<minifi::FlowController>();
+ LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
+ LogTestController::getInstance().setDebug<minifi::core::ProcessGroup>();
+ LogTestController::getInstance().setDebug<minifi::core::Processor>();
+ LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+ LogTestController::getInstance().setDebug<processors::GenerateFlowFile>();
+ LogTestController::getInstance().setTrace<minifi::utils::SFTPClient>();
+ LogTestController::getInstance().setTrace<processors::ListSFTP>();
+ LogTestController::getInstance().setDebug<processors::LogAttribute>();
+ LogTestController::getInstance().setDebug<SFTPTestServer>();
+
+ // Create temporary directories
+ testController.createTempDirectory(src_dir);
+ REQUIRE(src_dir != nullptr);
+
+ // Start SFTP server
+ sftp_server = std::unique_ptr<SFTPTestServer>(new SFTPTestServer(src_dir));
+ REQUIRE(true == sftp_server->start());
+
+ // Build MiNiFi processing graph
+ createPlan();
+ }
+
+ virtual ~ListSFTPTestsFixture() {
+ free(src_dir);
+ LogTestController::getInstance().reset();
+ }
+
+ void createPlan(utils::Identifier* list_sftp_uuid = nullptr) {
+ plan = testController.createPlan();
+ if (list_sftp_uuid == nullptr) {
+ list_sftp = plan->addProcessor(
+ "ListSFTP",
+ "ListSFTP");
+ } else {
+ list_sftp = plan->addProcessor(
+ "ListSFTP",
+ *list_sftp_uuid,
+ "ListSFTP",
+ {core::Relationship("success", "d")});
+ }
+ log_attribute = plan->addProcessor("LogAttribute",
+ "LogAttribute",
+ core::Relationship("success", "d"),
+ true);
+
+ // Configure ListSFTP processor
+ plan->setProperty(list_sftp, "Listing Strategy", processors::ListSFTP::LISTING_STRATEGY_TRACKING_TIMESTAMPS);
+ plan->setProperty(list_sftp, "Hostname", "localhost");
+ plan->setProperty(list_sftp, "Port", std::to_string(sftp_server->getPort()));
+ plan->setProperty(list_sftp, "Username", "nifiuser");
+ plan->setProperty(list_sftp, "Password", "nifipassword");
+ plan->setProperty(list_sftp, "Search Recursively", "false");
+ plan->setProperty(list_sftp, "Follow symlink", "false");
+ plan->setProperty(list_sftp, "Ignore Dotted Files", "false");
+ plan->setProperty(list_sftp, "Strict Host Key Checking", "false");
+ plan->setProperty(list_sftp, "Connection Timeout", "30 sec");
+ plan->setProperty(list_sftp, "Data Timeout", "30 sec");
+ plan->setProperty(list_sftp, "Send Keep Alive On Timeout", "true");
+ plan->setProperty(list_sftp, "Target System Timestamp Precision", processors::ListSFTP::TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT);
+ plan->setProperty(list_sftp, "Minimum File Age", "0 sec");
+ plan->setProperty(list_sftp, "Minimum File Size", "0 B");
+ plan->setProperty(list_sftp, "Target System Timestamp Precision", "Seconds");
+ plan->setProperty(list_sftp, "Remote Path", "nifi_test/");
+ plan->setProperty(list_sftp, "State File", std::string(src_dir) + "/state");
+
+ // Configure LogAttribute processor
+ plan->setProperty(log_attribute, "FlowFiles To Log", "0");
+ }
+
+ // Create source file
+ void createFile(const std::string& relative_path, const std::string& content, uint64_t modification_timestamp = 0U) {
+ std::fstream file;
+ std::stringstream ss;
+ ss << src_dir << "/vfs/" << relative_path;
+ auto full_path = ss.str();
+ std::deque<std::string> parent_dirs;
+ std::string parent_dir = full_path;
+ while ((parent_dir = utils::file::FileUtils::get_parent_path(parent_dir)) != "") {
+ parent_dirs.push_front(parent_dir);
+ }
+ for (const auto& dir : parent_dirs) {
+ utils::file::FileUtils::create_dir(dir);
+ }
+ file.open(ss.str(), std::ios::out);
+ file << content;
+ file.close();
+ if (modification_timestamp != 0U) {
+ REQUIRE(true == utils::file::FileUtils::set_last_write_time(full_path, modification_timestamp));
+ }
+ }
+
+ void createFileWithModificationTimeDiff(const std::string& relative_path, const std::string& content, int64_t modification_timediff = -300 /*5 minutes ago*/) {
+ time_t now = time(nullptr);
+ return createFile(relative_path, content, now + modification_timediff);
+ }
+
+ protected:
+ char *src_dir;
+ std::unique_ptr<SFTPTestServer> sftp_server;
+ TestController testController;
+ std::shared_ptr<TestPlan> plan;
+ std::shared_ptr<core::Processor> list_sftp;
+ std::shared_ptr<core::Processor> log_attribute;
+};
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list one file", "[ListSFTP][basic]") {
+ createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP public key authentication", "[ListSFTP][basic]") {
+ plan->setProperty(list_sftp, "Remote File", "nifi_test/tstFile.ext");
+ plan->setProperty(list_sftp, "Private Key Path", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/id_rsa"));
+ plan->setProperty(list_sftp, "Private Key Passphrase", "privatekeypassword");
+
+ createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("Successfully authenticated with publickey"));
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list non-existing dir", "[ListSFTP][basic]") {
+ plan->setProperty(list_sftp, "Remote Path", "nifi_test2/");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("Failed to open remote directory \"nifi_test2\", error: LIBSSH2_FX_NO_SUCH_FILE"));
+ REQUIRE(LogTestController::getInstance().contains("There are no files to list. Yielding."));
+}
+
+#ifndef WIN32
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list non-readable dir", "[ListSFTP][basic]") {
+ if (getuid() == 0) {
+ std::cerr << "!!!! This test does NOT work as root. Exiting. !!!!" << std::endl;
+ return;
+ }
+ createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+ REQUIRE(0 == chmod((std::string(src_dir) + "/vfs/nifi_test").c_str(), 0000));
+
+ testController.runSession(plan, true);
+
+ REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("Failed to open remote directory \"nifi_test\", error: LIBSSH2_FX_PERMISSION_DENIED"));
+ REQUIRE(LogTestController::getInstance().contains("There are no files to list. Yielding."));
+}
+#endif
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list one file writes attributes", "[ListSFTP][basic]") {
+ createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ auto file = std::string(src_dir) + "/vfs/nifi_test/tstFile.ext";
+ auto mtime = utils::file::FileUtils::last_write_time(file);
+ std::string mtime_str;
+ REQUIRE(true == getDateTimeStr(mtime, mtime_str));
+ uint64_t uid, gid;
+ REQUIRE(true == utils::file::FileUtils::get_uid_gid(file, uid, gid));
+ uint32_t permissions;
+ REQUIRE(true == utils::file::FileUtils::get_permissions(file, permissions));
+ std::stringstream permissions_ss;
+ permissions_ss << std::setfill('0') << std::setw(4) << std::oct << permissions;
+
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+ REQUIRE(LogTestController::getInstance().contains("key:sftp.listing.user value:nifiuser"));
+ REQUIRE(LogTestController::getInstance().contains("key:file.owner value:" + std::to_string(uid)));
+ REQUIRE(LogTestController::getInstance().contains("key:file.group value:" + std::to_string(gid)));
+ REQUIRE(LogTestController::getInstance().contains("key:file.permissions value:" + permissions_ss.str()));
+ REQUIRE(LogTestController::getInstance().contains("key:file.size value:14"));
+ REQUIRE(LogTestController::getInstance().contains("key:file.lastModifiedTime value:" + mtime_str));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+ REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list two files", "[ListSFTP][basic]") {
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+ createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test with longer content 2");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list two files one in a subdir no recursion", "[ListSFTP][basic]") {
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+ createFileWithModificationTimeDiff("nifi_test/subdir/file2.ext", "Test with longer content 2");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+ REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list two files one in a subdir with recursion", "[ListSFTP][basic]") {
+ plan->setProperty(list_sftp, "Search Recursively", "true");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+ createFileWithModificationTimeDiff("nifi_test/subdir/file2.ext", "Test with longer content 2");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Minimum File Age too young", "[ListSFTP][file-age]") {
+ plan->setProperty(list_sftp, "Minimum File Age", "2 hours");
+
+ createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(false == LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+ REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/tstFile.ext\" because it is younger than the Minimum File Age"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Maximum File Age too old", "[ListSFTP][file-age]") {
+ plan->setProperty(list_sftp, "Maximum File Age", "1 min");
+
+ createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(false == LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+ REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/tstFile.ext\" because it is older than the Maximum File Age"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Minimum File Size too small", "[ListSFTP][file-size]") {
+ plan->setProperty(list_sftp, "Minimum File Size", "1 MB");
+
+ createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+ REQUIRE(false == LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+ REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/tstFile.ext\" because it is smaller than the Minimum File Size: 14 B < 1048576 B"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Maximum File Size too large", "[ListSFTP][file-size]") {
+ plan->setProperty(list_sftp, "Maximum File Size", "4 B");
+
+ createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+ REQUIRE(false == LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+ REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/tstFile.ext\" because it is larger than the Maximum File Size: 14 B > 4 B"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP File Filter Regex", "[ListSFTP][file-filter-regex]") {
+ plan->setProperty(list_sftp, "File Filter Regex", "^.*2.*$");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+ createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test with longer content 2");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/file1.ext\" because it did not match the File Filter Regex \"^.*2.*$\""));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Path Filter Regex", "[ListSFTP][path-filter-regex]") {
+ plan->setProperty(list_sftp, "Search Recursively", "true");
+ plan->setProperty(list_sftp, "Path Filter Regex", "^.*foobar.*$");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+ createFileWithModificationTimeDiff("nifi_test/foobar/file2.ext", "Test content 2");
+ createFileWithModificationTimeDiff("nifi_test/notbar/file3.ext", "Test with longer content 3");
+
+ testController.runSession(plan, true);
+
+ /* file1.ext is in the root */
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+ /* file2.ext is in a matching subdirectory */
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+ /* file3.ext is in a non-matching subdirectory */
+ REQUIRE(LogTestController::getInstance().contains("Not recursing into \"nifi_test/notbar\" because it did not match the Path Filter Regex \"^.*foobar.*$\""));
+ REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file3.ext"));
+}
+
+#ifndef WIN32
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Follow symlink false file symlink", "[ListSFTP][follow-symlink]") {
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+ auto file1 = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
+ auto file2 = std::string(src_dir) + "/vfs/nifi_test/file2.ext";
+ REQUIRE(0 == symlink(file1.c_str(), file2.c_str()));
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+ REQUIRE(LogTestController::getInstance().contains("Skipping non-regular, non-directory file \"nifi_test/file2.ext\""));
+}
+#endif
+
+#ifndef WIN32
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Follow symlink true file symlink", "[ListSFTP][follow-symlink]") {
+ plan->setProperty(list_sftp, "Follow symlink", "true");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+ auto file1 = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
+ auto file2 = std::string(src_dir) + "/vfs/nifi_test/file2.ext";
+ REQUIRE(0 == symlink(file1.c_str(), file2.c_str()));
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+#endif
+
+#ifndef WIN32
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Follow symlink false directory symlink", "[ListSFTP][follow-symlink]") {
+ plan->setProperty(list_sftp, "Search Recursively", "true");
+
+ createFileWithModificationTimeDiff("nifi_test/dir1/file1.ext", "Test content 1");
+ auto dir1 = std::string(src_dir) + "/vfs/nifi_test/dir1";
+ auto dir2 = std::string(src_dir) + "/vfs/nifi_test/dir2";
+ REQUIRE(0 == symlink(dir1.c_str(), dir2.c_str()));
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+ REQUIRE(LogTestController::getInstance().contains("Skipping non-regular, non-directory file \"nifi_test/dir2\""));
+}
+#endif
+
+#ifndef WIN32
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Follow symlink true directory symlink", "[ListSFTP][follow-symlink]") {
+ plan->setProperty(list_sftp, "Search Recursively", "true");
+ plan->setProperty(list_sftp, "Follow symlink", "true");
+
+ createFileWithModificationTimeDiff("nifi_test/dir1/file1.ext", "Test content 1");
+ auto dir1 = std::string(src_dir) + "/vfs/nifi_test/dir1";
+ auto dir2 = std::string(src_dir) + "/vfs/nifi_test/dir2";
+ REQUIRE(0 == symlink(dir1.c_str(), dir2.c_str()));
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+ REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/dir1"));
+ REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/dir2"));
+}
+#endif
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file", "[ListSFTP][tracking-timestamps]") {
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+ createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+
+ plan->reset();
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("The latest listed entry timestamp is the same as the last listed entry timestamp"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file one new file", "[ListSFTP][tracking-timestamps]") {
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+ plan->reset();
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file one older file", "[ListSFTP][tracking-timestamps]") {
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+ plan->reset();
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -360 /* 6 minutes ago */);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("Skipping \"nifi_test/file2.ext\", because it is not new."));
+ REQUIRE(LogTestController::getInstance().contains("The latest listed entry timestamp is the same as the last listed entry timestamp"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file another file with the same timestamp", "[ListSFTP][tracking-timestamps]") {
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+ auto file = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
+ auto mtime = utils::file::FileUtils::last_write_time(file);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+ plan->reset();
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ /* We must sleep to avoid triggering the listing lag. */
+ std::this_thread::sleep_for(std::chrono::milliseconds(1500));
+
+ createFile("nifi_test/file2.ext", "Test content 2", mtime);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file1.ext"));
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file timestamp updated", "[ListSFTP][tracking-timestamps]") {
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+ auto file = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
+ auto mtime = utils::file::FileUtils::last_write_time(file);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+ plan->reset();
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ REQUIRE(true == utils::file::FileUtils::set_last_write_time(file, mtime + 1));
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+ plan->reset();
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ /* We must sleep to avoid triggering the listing lag. */
+ std::this_thread::sleep_for(std::chrono::milliseconds(1500));
+
+ testController.runSession(plan, true);
+
+ REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("and all files for that timestamp has been processed. Yielding."));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps restore state", "[ListSFTP][tracking-timestamps]") {
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+ utils::Identifier list_sftp_uuid;
+ REQUIRE(true == list_sftp->getUUID(list_sftp_uuid));
+ createPlan(&list_sftp_uuid);
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("Successfully loaded Tracking Timestamps state file"));
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps restore state changed configuration", "[ListSFTP][tracking-timestamps]") {
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+ utils::Identifier list_sftp_uuid;
+ REQUIRE(true == list_sftp->getUUID(list_sftp_uuid));
+ createPlan(&list_sftp_uuid);
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+ plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("was created with different settings than the current ones, ignoring. "
+ "Hostname: \"localhost\" vs. \"localhost\", "
+ "Username: \"nifiuser\" vs. \"nifiuser\", "
+ "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps changed configuration", "[ListSFTP][tracking-timestamps]") {
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+ plan->reset();
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+ plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
+
+ createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("was created with different settings than the current ones, ignoring. "
+ "Hostname: \"localhost\" vs. \"localhost\", "
+ "Username: \"nifiuser\" vs. \"nifiuser\", "
+ "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file", "[ListSFTP][tracking-entities]") {
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+ createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+
+ plan->reset();
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/tstFile.ext\" because it has not changed"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file one new file", "[ListSFTP][tracking-entities]") {
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+ plan->reset();
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file one older file in tracking window", "[ListSFTP][tracking-entities]") {
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+ plan->reset();
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -360 /* 6 minutes ago */);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file1.ext"));
+ REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file one older file outside tracking window", "[ListSFTP][tracking-entities]") {
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+ plan->reset();
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -6 * 3600 /* 6 hours ago */);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
+ REQUIRE(LogTestController::getInstance().contains("Skipping \"nifi_test/file2.ext\" because it has an older timestamp than the minimum timestamp to list"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file another file with the same timestamp", "[ListSFTP][tracking-entities]") {
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+ auto file = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
+ auto mtime = utils::file::FileUtils::last_write_time(file);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+ plan->reset();
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ createFile("nifi_test/file2.ext", "Test content 2", mtime);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file1.ext"));
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file timestamp updated", "[ListSFTP][tracking-entities]") {
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+ auto file = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
+ auto mtime = utils::file::FileUtils::last_write_time(file);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+ plan->reset();
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ REQUIRE(true == utils::file::FileUtils::set_last_write_time(file, mtime + 1));
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("Found file \"nifi_test/file1.ext\" with newer timestamp"));
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+ plan->reset();
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file size changes", "[ListSFTP][tracking-entities]") {
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+ auto file = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
+ auto mtime = utils::file::FileUtils::last_write_time(file);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+ plan->reset();
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ createFile("nifi_test/file1.ext", "Longer test content 1", mtime);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("Found file \"nifi_test/file1.ext\" with different size: 14 -> 21"));
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+ plan->reset();
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities restore state", "[ListSFTP][tracking-entities]") {
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+ utils::Identifier list_sftp_uuid;
+ REQUIRE(true == list_sftp->getUUID(list_sftp_uuid));
+ createPlan(&list_sftp_uuid);
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("Successfully loaded Tracking Entities state file"));
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities restore state changed configuration", "[ListSFTP][tracking-entities]") {
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+ utils::Identifier list_sftp_uuid;
+ REQUIRE(true == list_sftp->getUUID(list_sftp_uuid));
+ createPlan(&list_sftp_uuid);
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+ plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("was created with different settings than the current ones, ignoring. "
+ "Hostname: \"localhost\" vs. \"localhost\", "
+ "Username: \"nifiuser\" vs. \"nifiuser\", "
+ "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities changed configuration", "[ListSFTP][tracking-entities]") {
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+ plan->reset();
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+ plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
+
+ createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("was created with different settings than the current ones, ignoring. "
+ "Hostname: \"localhost\" vs. \"localhost\", "
+ "Username: \"nifiuser\" vs. \"nifiuser\", "
+ "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities Initial Listing Target Tracking Time Window entity outside window", "[ListSFTP][tracking-entities]") {
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+ plan->setProperty(list_sftp, "Entity Tracking Initial Listing Target", processors::ListSFTP::ENTITY_TRACKING_INITIAL_LISTING_TARGET_TRACKING_TIME_WINDOW);
+ plan->setProperty(list_sftp, "Entity Tracking Time Window", "10 minutes");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1", -20*60 /* 20 minutes ago */);
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("Skipping \"nifi_test/file1.ext\" because it has an older timestamp than the minimum timestamp to list"));
+ REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities Initial Listing Target Tracking Time Window entity inside window", "[ListSFTP][tracking-entities]") {
+ plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+ plan->setProperty(list_sftp, "Entity Tracking Initial Listing Target", processors::ListSFTP::ENTITY_TRACKING_INITIAL_LISTING_TARGET_TRACKING_TIME_WINDOW);
+ plan->setProperty(list_sftp, "Entity Tracking Time Window", "10 minutes");
+
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+ REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+}
diff --git a/extensions/sftp/tests/ListThenFetchSFTPTests.cpp b/extensions/sftp/tests/ListThenFetchSFTPTests.cpp
new file mode 100644
index 0000000..0282cb9
--- /dev/null
+++ b/extensions/sftp/tests/ListThenFetchSFTPTests.cpp
@@ -0,0 +1,269 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <cstring>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include <algorithm>
+#include <functional>
+#include <iterator>
+#include <random>
+
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "utils/file/FileUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "processors/FetchSFTP.h"
+#include "processors/ListSFTP.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/LogAttribute.h"
+#include "processors/UpdateAttribute.h"
+#include "processors/PutFile.h"
+#include "tools/SFTPTestServer.h"
+
+class ListThenFetchSFTPTestsFixture {
+ public:
+ ListThenFetchSFTPTestsFixture()
+ : src_dir(strdup("/tmp/sftps.XXXXXX"))
+ , dst_dir(strdup("/tmp/sftpd.XXXXXX")) {
+ LogTestController::getInstance().setTrace<TestPlan>();
+ LogTestController::getInstance().setDebug<minifi::FlowController>();
+ LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
+ LogTestController::getInstance().setDebug<minifi::core::ProcessGroup>();
+ LogTestController::getInstance().setDebug<minifi::core::Processor>();
+ LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+ LogTestController::getInstance().setDebug<processors::GenerateFlowFile>();
+ LogTestController::getInstance().setTrace<minifi::utils::SFTPClient>();
+ LogTestController::getInstance().setTrace<processors::ListSFTP>();
+ LogTestController::getInstance().setTrace<processors::FetchSFTP>();
+ LogTestController::getInstance().setTrace<processors::PutFile>();
+ LogTestController::getInstance().setDebug<processors::LogAttribute>();
+ LogTestController::getInstance().setDebug<SFTPTestServer>();
+
+ // Create temporary directories
+ testController.createTempDirectory(src_dir);
+ REQUIRE(src_dir != nullptr);
+ testController.createTempDirectory(dst_dir);
+ REQUIRE(dst_dir != nullptr);
+
+ // Start SFTP server
+ sftp_server = std::unique_ptr<SFTPTestServer>(new SFTPTestServer(src_dir));
+ REQUIRE(true == sftp_server->start());
+
+ // Build MiNiFi processing graph
+ plan = testController.createPlan();
+ list_sftp = plan->addProcessor(
+ "ListSFTP",
+ "ListSFTP");
+ fetch_sftp = plan->addProcessor(
+ "FetchSFTP",
+ "FetchSFTP",
+ core::Relationship("success", "d"),
+ true);
+ log_attribute = plan->addProcessor("LogAttribute",
+ "LogAttribute",
+ { core::Relationship("success", "d"),
+ core::Relationship("comms.failure", "d"),
+ core::Relationship("not.found", "d"),
+ core::Relationship("permission.denied", "d") },
+ true);
+ put_file = plan->addProcessor("PutFile",
+ "PutFile",
+ core::Relationship("success", "d"),
+ true);
+
+ // Configure ListSFTP processor
+ plan->setProperty(list_sftp, "Listing Strategy", processors::ListSFTP::LISTING_STRATEGY_TRACKING_TIMESTAMPS);
+ plan->setProperty(list_sftp, "Hostname", "localhost");
+ plan->setProperty(list_sftp, "Port", std::to_string(sftp_server->getPort()));
+ plan->setProperty(list_sftp, "Username", "nifiuser");
+ plan->setProperty(list_sftp, "Password", "nifipassword");
+ plan->setProperty(list_sftp, "Search Recursively", "false");
+ plan->setProperty(list_sftp, "Follow symlink", "false");
+ plan->setProperty(list_sftp, "Ignore Dotted Files", "false");
+ plan->setProperty(list_sftp, "Strict Host Key Checking", "false");
+ plan->setProperty(list_sftp, "Connection Timeout", "30 sec");
+ plan->setProperty(list_sftp, "Data Timeout", "30 sec");
+ plan->setProperty(list_sftp, "Send Keep Alive On Timeout", "true");
+ plan->setProperty(list_sftp, "Target System Timestamp Precision", processors::ListSFTP::TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT);
+ plan->setProperty(list_sftp, "Minimum File Age", "0 sec");
+ plan->setProperty(list_sftp, "Minimum File Size", "0 B");
+ plan->setProperty(list_sftp, "Target System Timestamp Precision", "Seconds");
+ plan->setProperty(list_sftp, "Remote Path", "nifi_test/");
+ plan->setProperty(list_sftp, "State File", std::string(src_dir) + "/state");
+
+ // Configure FetchSFTP processor
+ plan->setProperty(fetch_sftp, "Hostname", "localhost");
+ plan->setProperty(fetch_sftp, "Port", std::to_string(sftp_server->getPort()));
+ plan->setProperty(fetch_sftp, "Username", "nifiuser");
+ plan->setProperty(fetch_sftp, "Password", "nifipassword");
+ plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_NONE);
+ plan->setProperty(fetch_sftp, "Connection Timeout", "30 sec");
+ plan->setProperty(fetch_sftp, "Data Timeout", "30 sec");
+ plan->setProperty(fetch_sftp, "Strict Host Key Checking", "false");
+ plan->setProperty(fetch_sftp, "Send Keep Alive On Timeout", "true");
+ plan->setProperty(fetch_sftp, "Use Compression", "false");
+ plan->setProperty(fetch_sftp, "Remote File", "${path}/${filename}");
+
+ // Configure LogAttribute processor
+ plan->setProperty(log_attribute, "FlowFiles To Log", "0");
+
+ // Configure PutFile processor
+ plan->setProperty(put_file, "Directory", std::string(dst_dir) + "/${path}");
+ plan->setProperty(put_file, "Conflict Resolution Strategy", processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL);
+ plan->setProperty(put_file, "Create Missing Directories", "true");
+ }
+
+ virtual ~ListThenFetchSFTPTestsFixture() {
+ free(src_dir);
+ free(dst_dir);
+ LogTestController::getInstance().reset();
+ }
+
+ // Create source file
+ void createFile(const std::string& relative_path, const std::string& content, uint64_t modification_timestamp = 0U) {
+ std::fstream file;
+ std::stringstream ss;
+ ss << src_dir << "/vfs/" << relative_path;
+ auto full_path = ss.str();
+ std::deque<std::string> parent_dirs;
+ std::string parent_dir = full_path;
+ while ((parent_dir = utils::file::FileUtils::get_parent_path(parent_dir)) != "") {
+ parent_dirs.push_front(parent_dir);
+ }
+ for (const auto& dir : parent_dirs) {
+ utils::file::FileUtils::create_dir(dir);
+ }
+ file.open(ss.str(), std::ios::out);
+ file << content;
+ file.close();
+ if (modification_timestamp != 0U) {
+ REQUIRE(true == utils::file::FileUtils::set_last_write_time(full_path, modification_timestamp));
+ }
+ }
+
+ void createFileWithModificationTimeDiff(const std::string& relative_path, const std::string& content, int64_t modification_timediff = -300 /*5 minutes ago*/) {
+ time_t now = time(nullptr);
+ return createFile(relative_path, content, now + modification_timediff);
+ }
+
+ enum TestWhere {
+ IN_DESTINATION,
+ IN_SOURCE
+ };
+
+ void testFile(TestWhere where, const std::string& relative_path, const std::string& expected_content) {
+ std::stringstream resultFile;
+ if (where == IN_DESTINATION) {
+ resultFile << dst_dir << "/" << relative_path;
+ } else {
+ resultFile << src_dir << "/vfs/" << relative_path;
+#ifndef WIN32
+ /* Workaround for mina-sshd setting the read file's permissions to 0000 */
+ REQUIRE(0 == chmod(resultFile.str().c_str(), 0644));
+#endif
+ }
+ std::ifstream file(resultFile.str());
+ REQUIRE(true == file.good());
+ std::stringstream content;
+ std::vector<char> buffer(1024U);
+ while (file) {
+ file.read(buffer.data(), buffer.size());
+ content << std::string(buffer.data(), file.gcount());
+ }
+ REQUIRE(expected_content == content.str());
+ }
+
+ void testFileNotExists(TestWhere where, const std::string& relative_path) {
+ std::stringstream resultFile;
+ if (where == IN_DESTINATION) {
+ resultFile << dst_dir << "/" << relative_path;
+ } else {
+ resultFile << src_dir << "/vfs/" << relative_path;
+#ifndef WIN32
+ /* Workaround for mina-sshd setting the read file's permissions to 0000 */
+ REQUIRE(-1 == chmod(resultFile.str().c_str(), 0644));
+#endif
+ }
+ std::ifstream file(resultFile.str());
+ REQUIRE(false == file.is_open());
+ REQUIRE(false == file.good());
+ }
+
+ protected:
+ char *src_dir;
+ char *dst_dir;
+ std::unique_ptr<SFTPTestServer> sftp_server;
+ TestController testController;
+ std::shared_ptr<TestPlan> plan;
+ std::shared_ptr<core::Processor> list_sftp;
+ std::shared_ptr<core::Processor> fetch_sftp;
+ std::shared_ptr<core::Processor> log_attribute;
+ std::shared_ptr<core::Processor> put_file;
+};
+
+TEST_CASE_METHOD(ListThenFetchSFTPTestsFixture, "ListSFTP then FetchSFTP one file", "[ListThenFetchSFTP][basic]") {
+ createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+ testController.runSession(plan, true);
+
+ testFile(IN_SOURCE, "nifi_test/tstFile.ext", "Test content 1");
+ testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+}
+
+TEST_CASE_METHOD(ListThenFetchSFTPTestsFixture, "ListSFTP then FetchSFTP two files", "[ListThenFetchSFTP][basic]") {
+ createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+ createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2");
+
+ /* ListSFTP */
+ plan->runNextProcessor();
+
+ /* FetchSFTP */
+ plan->runNextProcessor();
+ plan->runCurrentProcessor();
+
+ /* LogAttribute */
+ plan->runNextProcessor();
+
+ /* PutFile */
+ plan->runNextProcessor();
+ plan->runCurrentProcessor();
+
+ testFile(IN_SOURCE, "nifi_test/file1.ext", "Test content 1");
+ testFile(IN_DESTINATION, "nifi_test/file1.ext", "Test content 1");
+ testFile(IN_SOURCE, "nifi_test/file2.ext", "Test content 2");
+ testFile(IN_DESTINATION, "nifi_test/file2.ext", "Test content 2");
+}
diff --git a/extensions/sftp/tests/PutSFTPTests.cpp b/extensions/sftp/tests/PutSFTPTests.cpp
index c59a7e6..b9189df 100644
--- a/extensions/sftp/tests/PutSFTPTests.cpp
+++ b/extensions/sftp/tests/PutSFTPTests.cpp
@@ -34,6 +34,9 @@
#include <functional>
#include <iterator>
#include <random>
+#ifndef WIN32
+#include <unistd.h>
+#endif
#include "TestBase.h"
#include "utils/StringUtils.h"
@@ -523,7 +526,10 @@
#ifndef WIN32
TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP set uid and gid", "[PutSFTP]") {
- std::cerr << "!!!! This test ONLY works as root, because it needs to chown !!!!" << std::endl;
+ if (getuid() != 0) {
+ std::cerr << "!!!! This test ONLY works as root, because it needs to chown. Exiting. !!!!" << std::endl;
+ return;
+ }
plan->setProperty(put, "Remote Owner", "1234");
plan->setProperty(put, "Remote Group", "4567");
diff --git a/extensions/sftp/tests/tools/SFTPTestServer.cpp b/extensions/sftp/tests/tools/SFTPTestServer.cpp
index 90efe2f..681b88f 100644
--- a/extensions/sftp/tests/tools/SFTPTestServer.cpp
+++ b/extensions/sftp/tests/tools/SFTPTestServer.cpp
@@ -78,16 +78,11 @@
/* fork */
pid_t pid = fork();
if (pid == 0) {
- /* Set the child process's pgid to its pid, so we will be able to kill the entire process group */
- if (setpgid(0, 0) != 0) {
- std::cerr << "Failed to set PGID, errno: " << strerror(errno) << std::endl;
- exit(-1);
- }
/* execv */
std::vector<char*> args(4U);
args[0] = strdup("/bin/sh");
args[1] = strdup("-c");
- args[2] = strdup(("java -Djava.security.egd=file:/dev/./urandom -jar " + jar_path_ + " -w " + working_directory_ + " -k " + host_key_file_ + " &> " + server_log_file_path).c_str());
+ args[2] = strdup(("exec java -Djava.security.egd=file:/dev/./urandom -jar " + jar_path_ + " -w " + working_directory_ + " -k " + host_key_file_ + " >" + server_log_file_path + " 2>&1").c_str());
args[3] = nullptr;
execv("/bin/sh", args.data());
std::cerr << "Failed to start server, errno: " << strerror(errno) << std::endl;
@@ -127,7 +122,7 @@
throw std::runtime_error("Not implemented");
#else
if (server_pid_ != -1) {
- if (::kill(-server_pid_, SIGTERM) != 0) {
+ if (::kill(server_pid_, SIGTERM) != 0) {
logger_->log_error("Failed to kill child process, error: %s", strerror(errno));
return false;
}
diff --git a/libminifi/include/core/Property.h b/libminifi/include/core/Property.h
index 4f80689..2553c09 100644
--- a/libminifi/include/core/Property.h
+++ b/libminifi/include/core/Property.h
@@ -359,7 +359,7 @@
}
static bool StringToDateTime(const std::string& input, int64_t& output) {
- int64_t temp = pareDateTimeStr(input);
+ int64_t temp = parseDateTimeStr(input);
if (temp == -1) {
return false;
}
diff --git a/libminifi/include/utils/TimeUtil.h b/libminifi/include/utils/TimeUtil.h
index a20059a..437bd71 100644
--- a/libminifi/include/utils/TimeUtil.h
+++ b/libminifi/include/utils/TimeUtil.h
@@ -23,6 +23,8 @@
#include <iomanip>
#include <sstream>
#include <chrono>
+#include <array>
+#include <limits>
#define TIME_FORMAT "%Y-%m-%d %H:%M:%S"
@@ -67,7 +69,7 @@
* @param str the datetime string
* @returns Unix timestamp
*/
-inline int64_t pareDateTimeStr(const std::string &str) {
+inline int64_t parseDateTimeStr(const std::string &str) {
/**
* There is no strptime on Windows. As long as we have to parse a single date format this is not so bad,
* but if multiple formats will have to be supported in the future, it might be worth it to include
@@ -123,4 +125,22 @@
return time + timezone_offset;
}
+inline bool getDateTimeStr(int64_t unix_timestamp, std::string& date_time_str) {
+ if (unix_timestamp > (std::numeric_limits<time_t>::max)() || unix_timestamp < (std::numeric_limits<time_t>::lowest)()) {
+ return false;
+ }
+ time_t time = static_cast<time_t>(unix_timestamp);
+ struct tm* gmt = gmtime(&time);
+ if (gmt == nullptr) {
+ return false;
+ }
+ std::array<char, 64U> buf;
+ if (strftime(buf.data(), buf.size(), "%Y-%m-%dT%H:%M:%SZ", gmt) == 0U) {
+ return false;
+ }
+
+ date_time_str = buf.data();
+ return true;
+}
+
#endif
diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h
index 5a57f91..0b000ed 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -33,6 +33,7 @@
#else
#include <sys/stat.h>
#include <sys/types.h>
+#include <utime.h>
#include <dirent.h>
#include <errno.h>
#endif
@@ -50,6 +51,8 @@
#include <tchar.h> // _tcscpy,_tcscat,_tcscmp
#include <string> // string
#include <algorithm> // replace
+#include <sys/types.h>
+#include <sys/utime.h> // _utime64
#endif
#ifdef __APPLE__
#include <mach-o/dyld.h>
@@ -79,16 +82,19 @@
FileUtils() = delete;
- /**
- * Deletes a directory, deleting recursively if delete_files_recursively is true
- * @param path current path to delete
- * @param delete_files_recursively deletes recursively
+ /*
+ * Get the platform-specific path separator.
+ * @param force_posix returns the posix path separator ('/'), even when not on posix. Useful when dealing with remote posix paths.
+ * @return the path separator character
*/
-
- static char get_separator()
+ static char get_separator(bool force_posix = false)
{
#ifdef WIN32
- return '\\';
+ if (force_posix) {
+ return '/';
+ } else {
+ return '\\';
+ }
#else
return '/';
#endif
@@ -200,6 +206,20 @@
return 0;
}
+ static bool set_last_write_time(const std::string &path, uint64_t write_time) {
+#ifdef WIN32
+ struct __utimbuf64 utim;
+ utim.actime = write_time;
+ utim.modtime = write_time;
+ return _utime64(path.c_str(), &utim) == 0U;
+#else
+ struct utimbuf utim;
+ utim.actime = write_time;
+ utim.modtime = write_time;
+ return utime(path.c_str(), &utim) == 0U;
+#endif
+ }
+
#ifndef WIN32
static bool get_permissions(const std::string &path, uint32_t &permissions) {
struct stat result;
@@ -416,76 +436,99 @@
#endif
}
- static std::string concat_path(const std::string& root, const std::string& child) {
+ static std::string concat_path(const std::string& root, const std::string& child, bool force_posix = false) {
if (root.empty()) {
return child;
}
std::stringstream new_path;
- if (root.back() == get_separator()) {
+ if (root.back() == get_separator(force_posix)) {
new_path << root << child;
} else {
- new_path << root << get_separator() << child;
+ new_path << root << get_separator(force_posix) << child;
}
return new_path.str();
}
- static std::string get_parent_path(const std::string& path) {
+ static std::tuple<std::string /*parent_path*/, std::string /*child_path*/> split_path(const std::string& path, bool force_posix = false) {
if (path.empty()) {
- /* Empty path has no parent */
- return "";
+ /* Empty path has no parent and no child*/
+ return std::make_tuple("", "");
}
bool absolute = false;
size_t root_pos = 0U;
#ifdef WIN32
- if (path[0] == '\\') {
- absolute = true;
- if (path.size() < 2U) {
- return "";
- }
- if (path[1] == '\\') {
- if (path.size() >= 4U &&
- (path[2] == '?' || path[2] == '.') &&
- path[3] == '\\') {
- /* Probably an UNC path */
- root_pos = 4U;
- } else {
- /* Probably a \\server\-type path */
- root_pos = 2U;
+ if (!force_posix) {
+ if (path[0] == '\\') {
+ absolute = true;
+ if (path.size() < 2U) {
+ return std::make_tuple("", "");
}
- root_pos = path.find_first_of("\\", root_pos);
- if (root_pos == std::string::npos) {
- return "";
+ if (path[1] == '\\') {
+ if (path.size() >= 4U &&
+ (path[2] == '?' || path[2] == '.') &&
+ path[3] == '\\') {
+ /* Probably an UNC path */
+ root_pos = 4U;
+ } else {
+ /* Probably a \\server\-type path */
+ root_pos = 2U;
+ }
+ root_pos = path.find_first_of("\\", root_pos);
+ if (root_pos == std::string::npos) {
+ return std::make_tuple("", "");
+ }
}
+ } else if (path.size() >= 3U &&
+ toupper(path[0]) >= 'A' &&
+ toupper(path[0]) <= 'Z' &&
+ path[1] == ':' &&
+ path[2] == '\\') {
+ absolute = true;
+ root_pos = 2U;
}
- } else if (path.size() >= 3U &&
- toupper(path[0]) >= 'A' &&
- toupper(path[0]) <= 'Z' &&
- path[1] == ':' &&
- path[2] == '\\') {
- absolute = true;
- root_pos = 2U;
- }
+ } else {
#else
- if (path[0] == '/') {
- absolute = true;
- root_pos = 0U;
- }
+ if (true) {
#endif
+ if (path[0] == '/') {
+ absolute = true;
+ root_pos = 0U;
+ }
+ }
+ /* Maybe we are just a single relative child */
+ if (!absolute && path.find(get_separator(force_posix)) == std::string::npos) {
+ return std::make_tuple("", path);
+ }
/* Ignore trailing separators */
size_t last_pos = path.size() - 1;
- while (last_pos > root_pos && path[last_pos] == get_separator()) {
+ while (last_pos > root_pos && path[last_pos] == get_separator(force_posix)) {
last_pos--;
}
if (absolute && last_pos == root_pos) {
/* This means we are only a root */
- return "";
+ return std::make_tuple("", "");
}
- /* Find parent */
- size_t last_separator = path.find_last_of(get_separator(), last_pos);
+ /* Find parent-child separator */
+ size_t last_separator = path.find_last_of(get_separator(force_posix), last_pos);
if (last_separator == std::string::npos || last_separator < root_pos) {
- return "";
+ return std::make_tuple("", "");
}
- return path.substr(0, last_separator + 1);
+ std::string parent = path.substr(0, last_separator + 1);
+ std::string child = path.substr(last_separator + 1);
+
+ return std::make_tuple(std::move(parent), std::move(child));
+ }
+
+ static std::string get_parent_path(const std::string& path, bool force_posix = false) {
+ std::string parent_path;
+ std::tie(parent_path, std::ignore) = split_path(path, force_posix);
+ return parent_path;
+ }
+
+ static std::string get_child_path(const std::string& path, bool force_posix = false) {
+ std::string child_path;
+ std::tie(std::ignore, child_path) = split_path(path, force_posix);
+ return child_path;
}
/*
diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp
index 73dc77a..2ef6f0b 100644
--- a/libminifi/src/core/ConfigurableComponent.cpp
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -70,7 +70,7 @@
auto &&it = properties_.find(name);
if (it != properties_.end()) {
- Property &orig_property = it->second;
+ Property orig_property = it->second;
Property new_property = orig_property;
new_property.setValue(value);
properties_[new_property.getName()] = new_property;
@@ -102,7 +102,7 @@
auto &&it = properties_.find(name);
if (it != properties_.end()) {
- Property &orig_property = it->second;
+ Property orig_property = it->second;
Property new_property = orig_property;
new_property.addValue(value);
properties_[new_property.getName()] = new_property;
@@ -125,7 +125,7 @@
auto it = properties_.find(prop.getName());
if (it != properties_.end()) {
- Property &orig_property = it->second;
+ Property orig_property = it->second;
Property new_property = orig_property;
new_property.setValue(value);
properties_[new_property.getName()] = new_property;
@@ -153,7 +153,7 @@
auto it = properties_.find(prop.getName());
if (it != properties_.end()) {
- Property &orig_property = it->second;
+ Property orig_property = it->second;
Property new_property = orig_property;
new_property.setValue(value);
properties_[new_property.getName()] = new_property;
@@ -230,7 +230,7 @@
auto &&it = dynamic_properties_.find(name);
if (it != dynamic_properties_.end()) {
- Property &orig_property = it->second;
+ Property orig_property = it->second;
Property new_property = orig_property;
new_property.setValue(value);
new_property.setSupportsExpressionLanguage(true);
@@ -248,7 +248,7 @@
auto &&it = dynamic_properties_.find(name);
if (it != dynamic_properties_.end()) {
- Property &orig_property = it->second;
+ Property orig_property = it->second;
Property new_property = orig_property;
new_property.addValue(value);
new_property.setSupportsExpressionLanguage(true);
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 43eead4..f496b3f 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -96,6 +96,23 @@
return processor;
}
+std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &processor_name, utils::Identifier& uuid, const std::string &name, const std::initializer_list<core::Relationship>& relationships, bool linkToPrevious) {
+ if (finalized) {
+ return nullptr;
+ }
+ std::lock_guard<std::recursive_mutex> guard(mutex);
+
+ auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
+ if (nullptr == ptr) {
+ throw std::exception();
+ }
+ std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
+
+ processor->setName(name);
+
+ return addProcessor(processor, name, relationships, linkToPrevious);
+}
+
std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &processor_name, const std::string &name, const std::initializer_list<core::Relationship>& relationships, bool linkToPrevious) {
if (finalized) {
return nullptr;
@@ -108,15 +125,7 @@
std::cout << "generated " << uuid.to_string() << std::endl;
- auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
- if (nullptr == ptr) {
- throw std::exception();
- }
- std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
-
- processor->setName(name);
-
- return addProcessor(processor, name, relationships, linkToPrevious);
+ return addProcessor(processor_name, uuid, name, relationships, linkToPrevious);
}
bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value, bool dynamic) {
@@ -169,6 +178,7 @@
}
std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context);
process_sessions_.push_back(current_session);
+ current_flowfile_ = nullptr;
processor->incrementActiveTasks();
processor->setScheduledState(core::ScheduledState::RUNNING);
if (verify != nullptr) {
@@ -178,7 +188,30 @@
processor->onTrigger(context, current_session);
}
current_session->commit();
- current_flowfile_ = current_session->get();
+ return location + 1 < processor_queue_.size();
+}
+
+bool TestPlan::runCurrentProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify) {
+ if (!finalized) {
+ finalize();
+ }
+ logger_->log_info("Rerunning current processor %d, processor_queue_.size %d, processor_contexts_.size %d", location, processor_queue_.size(), processor_contexts_.size());
+ std::lock_guard<std::recursive_mutex> guard(mutex);
+
+ std::shared_ptr<core::Processor> processor = processor_queue_.at(location);
+ std::shared_ptr<core::ProcessContext> context = processor_contexts_.at(location);
+ std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context);
+ process_sessions_.push_back(current_session);
+ current_flowfile_ = nullptr;
+ processor->incrementActiveTasks();
+ processor->setScheduledState(core::ScheduledState::RUNNING);
+ if (verify != nullptr) {
+ verify(context, current_session);
+ } else {
+ logger_->log_info("Running %s", processor->getName());
+ processor->onTrigger(context, current_session);
+ }
+ current_session->commit();
return location + 1 < processor_queue_.size();
}
@@ -187,6 +220,9 @@
}
std::shared_ptr<core::FlowFile> TestPlan::getCurrentFlowFile() {
+ if (current_flowfile_ == nullptr) {
+ current_flowfile_ = process_sessions_.at(location)->get();
+ }
return current_flowfile_;
}
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 33bf40e..21a2f43 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -187,12 +187,17 @@
std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, const std::initializer_list<core::Relationship>& relationships,
bool linkToPrevious = false);
+ std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, utils::Identifier& uuid, const std::string &name, const std::initializer_list<core::Relationship>& relationships,
+ bool linkToPrevious = false);
+
bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value, bool dynamic = false);
void reset();
bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
+ bool runCurrentProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
+
std::set<std::shared_ptr<provenance::ProvenanceEventRecord>> getProvenanceRecords();
std::shared_ptr<core::FlowFile> getCurrentFlowFile();
@@ -232,7 +237,6 @@
int location;
- std::shared_ptr<core::ProcessSession> current_session_;
std::shared_ptr<core::FlowFile> current_flowfile_;
std::shared_ptr<minifi::state::response::FlowVersion> flow_version_;
diff --git a/libminifi/test/unit/FileUtilsTests.cpp b/libminifi/test/unit/FileUtilsTests.cpp
index 8b3d3ae..85c783b 100644
--- a/libminifi/test/unit/FileUtilsTests.cpp
+++ b/libminifi/test/unit/FileUtilsTests.cpp
@@ -59,6 +59,28 @@
#endif
}
+TEST_CASE("TestFileUtils::get_child_path", "[TestGetChildPath]") {
+#ifdef WIN32
+ REQUIRE("bar" == FileUtils::get_child_path("foo\\bar"));
+ REQUIRE("bar\\" == FileUtils::get_child_path("foo\\bar\\"));
+ REQUIRE("bar" == FileUtils::get_child_path("C:\\foo\\bar"));
+ REQUIRE("bar\\" == FileUtils::get_child_path("C:\\foo\\bar\\"));
+ REQUIRE("foo" == FileUtils::get_child_path("C:\\foo"));
+ REQUIRE("foo\\" == FileUtils::get_child_path("C:\\foo\\"));
+ REQUIRE("" == FileUtils::get_child_path("C:\\"));
+ REQUIRE("" == FileUtils::get_child_path("C:\\\\"));
+#else
+ REQUIRE("bar" == FileUtils::get_child_path("foo/bar"));
+ REQUIRE("bar/" == FileUtils::get_child_path("foo/bar/"));
+ REQUIRE("bar" == FileUtils::get_child_path("/foo/bar"));
+ REQUIRE("bar/" == FileUtils::get_child_path("/foo/bar/"));
+ REQUIRE("foo" == FileUtils::get_child_path("/foo"));
+ REQUIRE("foo/" == FileUtils::get_child_path("/foo/"));
+ REQUIRE("" == FileUtils::get_child_path("/"));
+ REQUIRE("" == FileUtils::get_child_path("//"));
+#endif
+}
+
TEST_CASE("TestFileUtils::get_executable_path", "[TestGetExecutablePath]") {
std::string executable_path = FileUtils::get_executable_path();
std::cerr << "Executable path: " << executable_path << std::endl;