MINIFICPP-911 - Added ListSFTP and FetchSFTP processors

This closes #586.

Signed-off-by: Marc Parisi <phrocker@apache.org>
diff --git a/NOTICE b/NOTICE
index 8984cd0..252860b 100644
--- a/NOTICE
+++ b/NOTICE
@@ -22,6 +22,7 @@
     JniProcessSession extends and is based on ProcessSession
     JniProcessSessionFactory extends and is based on ProcessSessionFactory
     JniProvenanceReporter extends and is based on ProvenanceReporter
+    SFTPTestServer extends and is based on SSHTestServer
 
 This includes derived works from the cURL (MIT/X-style licensed) project (https://github.com/curl/curl):
 Copyright (c) 1996 - 2019, Daniel Stenberg, <daniel@haxx.se>, and many contributors, see the THANKS file.
diff --git a/extensions/sftp/CMakeLists.txt b/extensions/sftp/CMakeLists.txt
index 89837aa..2536ee1 100644
--- a/extensions/sftp/CMakeLists.txt
+++ b/extensions/sftp/CMakeLists.txt
@@ -70,6 +70,9 @@
 include_directories(${ZLIB_INCLUDE_DIRS})
 target_link_libraries(minifi-sftp ${ZLIB_LIBRARIES})
 
+# Include RapidJSON
+include_directories(thirdparty/rapidjson-1.1.0/include)
+
 if (WIN32)
 message("${OPENSSL_LIBRARIES}")
 	set (WIN32_ARCHIVES "")
diff --git a/extensions/sftp/SFTPLoader.h b/extensions/sftp/SFTPLoader.h
index dd36e24..0ab0286 100644
--- a/extensions/sftp/SFTPLoader.h
+++ b/extensions/sftp/SFTPLoader.h
@@ -18,8 +18,10 @@
 #ifndef EXTENSION_SFTPLOADER_H
 #define EXTENSION_SFTPLOADER_H
 
-#include "processors/PutSFTP.h"
 #include "core/ClassLoader.h"
+#include "processors/PutSFTP.h"
+#include "processors/FetchSFTP.h"
+#include "processors/ListSFTP.h"
 
 class SFTPFactoryInitializer : public core::ObjectFactoryInitializer {
  public:
@@ -57,6 +59,10 @@
   virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) {
     if (utils::StringUtils::equalsIgnoreCase(class_name, "PutSFTP")) {
       return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::PutSFTP>());
+    } else if (utils::StringUtils::equalsIgnoreCase(class_name, "FetchSFTP")) {
+      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::FetchSFTP>());
+    } else if (utils::StringUtils::equalsIgnoreCase(class_name, "ListSFTP")) {
+      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::processors::ListSFTP>());
     } else {
       return nullptr;
     }
diff --git a/extensions/sftp/client/SFTPClient.cpp b/extensions/sftp/client/SFTPClient.cpp
index 6d4254b..bc27d5a 100644
--- a/extensions/sftp/client/SFTPClient.cpp
+++ b/extensions/sftp/client/SFTPClient.cpp
@@ -65,8 +65,77 @@
   }
 }
 
+static SFTPError libssh2_sftp_error_to_sftp_error(unsigned long libssh2_sftp_error) {
+  switch (libssh2_sftp_error) {
+    case LIBSSH2_FX_OK:
+      return SFTPError::SFTP_ERROR_OK;
+    case LIBSSH2_FX_NO_SUCH_FILE:
+    case LIBSSH2_FX_NO_SUCH_PATH:
+      return SFTPError::SFTP_ERROR_FILE_NOT_EXISTS;
+    case LIBSSH2_FX_FILE_ALREADY_EXISTS:
+      return SFTPError::SFTP_ERROR_FILE_ALREADY_EXISTS;
+    case LIBSSH2_FX_PERMISSION_DENIED:
+    case LIBSSH2_FX_WRITE_PROTECT:
+    case LIBSSH2_FX_LOCK_CONFLICT:
+      return SFTPError::SFTP_ERROR_PERMISSION_DENIED;
+    case LIBSSH2_FX_NO_CONNECTION:
+    case LIBSSH2_FX_CONNECTION_LOST:
+      return SFTPError::SFTP_ERROR_COMMUNICATIONS_FAILURE;
+    case LIBSSH2_FX_EOF:
+    case LIBSSH2_FX_FAILURE:
+    case LIBSSH2_FX_BAD_MESSAGE:
+    case LIBSSH2_FX_OP_UNSUPPORTED:
+    case LIBSSH2_FX_INVALID_HANDLE:
+    case LIBSSH2_FX_NO_MEDIA:
+    case LIBSSH2_FX_NO_SPACE_ON_FILESYSTEM:
+    case LIBSSH2_FX_QUOTA_EXCEEDED:
+    case LIBSSH2_FX_UNKNOWN_PRINCIPAL:
+    case LIBSSH2_FX_DIR_NOT_EMPTY:
+    case LIBSSH2_FX_NOT_A_DIRECTORY:
+    case LIBSSH2_FX_INVALID_FILENAME:
+    case LIBSSH2_FX_LINK_LOOP:
+    default:
+      return SFTPError::SFTP_ERROR_UNEXPECTED;
+  }
+}
+
 constexpr size_t SFTPClient::MAX_BUFFER_SIZE;
 
+LastSFTPError::LastSFTPError()
+    : sftp_error_set_(false)
+    , libssh2_sftp_error_(LIBSSH2_FX_OK)
+    , sftp_error_(SFTPError::SFTP_ERROR_OK) {
+}
+
+LastSFTPError& LastSFTPError::operator=(unsigned long libssh2_sftp_error) {
+  sftp_error_set_ = false;
+  libssh2_sftp_error_ = libssh2_sftp_error;
+  return *this;
+}
+
+LastSFTPError& LastSFTPError::operator=(const SFTPError& sftp_error) {
+  sftp_error_set_ = true;
+  sftp_error_ = sftp_error;
+  return *this;
+}
+
+LastSFTPError::operator unsigned long() const {
+  if (sftp_error_set_) {
+    return LIBSSH2_FX_OK;
+  } else {
+    return libssh2_sftp_error_;
+  }
+}
+
+LastSFTPError::operator SFTPError() const {
+  if (sftp_error_set_) {
+    return sftp_error_;
+  } else {
+    return libssh2_sftp_error_to_sftp_error(libssh2_sftp_error_);
+  }
+}
+
+
 SFTPClient::SFTPClient(const std::string &hostname, uint16_t port, const std::string& username)
     : logger_(logging::LoggerFactory<SFTPClient>::getLogger()),
       hostname_(hostname),
@@ -82,7 +151,8 @@
       easy_(nullptr),
       ssh_session_(nullptr),
       sftp_session_(nullptr),
-      connected_(false) {
+      connected_(false),
+      last_error_() {
   easy_ = curl_easy_init();
   if (easy_ == nullptr) {
     throw std::runtime_error("Cannot create curl easy handle");
@@ -403,10 +473,39 @@
   return true;
 }
 
+SFTPError SFTPClient::getLastError() const {
+  return last_error_;
+}
+
 bool SFTPClient::getFile(const std::string& path, io::BaseStream& output, int64_t expected_size /*= -1*/) {
-  LIBSSH2_SFTP_HANDLE *file_handle = libssh2_sftp_open(sftp_session_, path.c_str(), LIBSSH2_FXF_READ, 0);
+  /**
+   * SFTP servers should not set the mode of an existing file on open
+   * (see https://tools.ietf.org/html/draft-ietf-secsh-filexfer-13, Page 33
+   * "The 'attrs' field is ignored if an existing file is opened."
+   * Unfortunately this is a later SFTP version specification than implemented by most servers.)
+   * But because this is the intuitively correct behaviour (especially when opening a file for read only),
+   * most servers (OpenSSH for example) implement it this way.
+   * mina-sshd, the server we use for testing, however did not until recently,
+   * causing all files we read to be set to 0000.
+   * The fix to make it behave correctly has been merged back to master, but not yet released:
+   * https://github.com/apache/mina-sshd/commit/19adb39e4706929b6e5a1b2df056a2b2a29fac4d
+   * If we encounter real servers that behave like this, a workaround would be to stat before opening the file
+   * and "re-setting" the mode we read earlier on open.
+   * An another option would be to patch libssh2 to not send permissions in attrs when opening a file for read only.
+   */
+  LIBSSH2_SFTP_HANDLE *file_handle = libssh2_sftp_open(sftp_session_, path.c_str(), LIBSSH2_FXF_READ, 0 /*mode*/);
   if (file_handle == nullptr) {
-    logger_->log_error("Failed to open remote file \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+    int ssh_errno = libssh2_session_last_errno(ssh_session_);
+    /* We can only get the sftp error in this case if the ssh error is a protocol error */
+    if (ssh_errno == LIBSSH2_ERROR_SFTP_PROTOCOL) {
+      last_error_ = libssh2_sftp_last_error(sftp_session_);
+      logger_->log_error("Failed to open remote file \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
+    } else {
+      last_error_ = SFTPError::SFTP_ERROR_IO_ERROR;
+      char *err_msg = nullptr;
+      libssh2_session_last_error(ssh_session_, &err_msg, nullptr, 0);
+      logger_->log_error("Failed to open remote file \"%s\" due to an underlying SSH error: %s", path.c_str(), err_msg);
+    }
     return false;
   }
   utils::ScopeGuard guard([&file_handle]() {
@@ -419,7 +518,8 @@
   do {
     ssize_t read_ret = libssh2_sftp_read(file_handle, reinterpret_cast<char*>(buf.data()), buf.size());
     if (read_ret < 0) {
-      logger_->log_error("Failed to read remote file \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+      last_error_ = SFTPError::SFTP_ERROR_IO_ERROR;
+      logger_->log_error("Failed to read remote file \"%s\"", path.c_str());
       return false;
     } else if (read_ret == 0) {
       logger_->log_trace("EOF while reading remote file \"%s\"", path.c_str());
@@ -429,8 +529,9 @@
     total_read += read_ret;
     int remaining = read_ret;
     while (remaining > 0) {
-      int write_ret = output.writeData(buf.data() + (buf.size() - remaining), remaining);
+      int write_ret = output.writeData(buf.data() + (read_ret - remaining), remaining);
       if (write_ret < 0) {
+        last_error_ = LIBSSH2_FX_OK;
         logger_->log_error("Failed to write output");
         return false;
       }
@@ -439,6 +540,7 @@
   } while (true);
 
   if (expected_size >= 0 && total_read != expected_size) {
+    last_error_ = LIBSSH2_FX_OK;
     logger_->log_error("Remote file \"%s\" has unexpected size, expected: %ld, actual: %lu", path.c_str(), expected_size, total_read);
     return false;
   }
@@ -451,8 +553,17 @@
   logger_->log_trace("Opening remote file \"%s\"", path.c_str());
   LIBSSH2_SFTP_HANDLE *file_handle = libssh2_sftp_open(sftp_session_, path.c_str(), flags, 0644);
   if (file_handle == nullptr) {
-    logger_->log_error("Failed to open remote file \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
-    return false;
+    int ssh_errno = libssh2_session_last_errno(ssh_session_);
+    /* We can only get the sftp error in this case if the ssh error is a protocol error */
+    if (ssh_errno == LIBSSH2_ERROR_SFTP_PROTOCOL) {
+      last_error_ = libssh2_sftp_last_error(sftp_session_);
+      logger_->log_error("Failed to open remote file \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
+    } else {
+      last_error_ = SFTPError::SFTP_ERROR_IO_ERROR;
+      char *err_msg = nullptr;
+      libssh2_session_last_error(ssh_session_, &err_msg, nullptr, 0);
+      logger_->log_error("Failed to open remote file \"%s\" due to an underlying SSH error: %s", path.c_str(), err_msg);
+    }
   }
   utils::ScopeGuard guard([this, &file_handle, &path]() {
     logger_->log_trace("Closing remote file \"%s\"", path.c_str());
@@ -470,8 +581,7 @@
   do {
     int read_ret = input.readData(buf.data(), buf.size());
     if (read_ret < 0) {
-      char *err_msg = nullptr;
-      libssh2_session_last_error(ssh_session_, &err_msg, nullptr, 0);
+      last_error_ = LIBSSH2_FX_OK;
       logger_->log_error("Error while reading input");
       return false;
     } else if (read_ret == 0) {
@@ -484,7 +594,8 @@
     while (remaining > 0) {
       int write_ret = libssh2_sftp_write(file_handle, reinterpret_cast<char*>(buf.data() + (read_ret - remaining)), remaining);
       if (write_ret < 0) {
-        logger_->log_error("Failed to write remote file \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+        last_error_ = SFTPError::SFTP_ERROR_IO_ERROR;
+        logger_->log_error("Failed to write remote file \"%s\"", path.c_str());
         return false;
       }
       logger_->log_trace("Wrote %d bytes to remote file \"%s\"", write_ret, path.c_str());
@@ -493,6 +604,7 @@
   } while (true);
 
   if (expected_size >= 0 && total_read != expected_size) {
+    last_error_ = LIBSSH2_FX_OK;
     logger_->log_error("Input has unexpected size, expected: %ld, actual: %lu", path.c_str(), expected_size, total_read);
     return false;
   }
@@ -524,10 +636,11 @@
       }
       continue;
     }
+    last_error_ = libssh2_sftp_last_error(sftp_session_);
     logger_->log_error("Failed to rename remote file \"%s\" to \"%s\", error: %s",
         source_path.c_str(),
         target_path.c_str(),
-        sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+        sftp_strerror(last_error_));
     return false;
   }
   return true;
@@ -535,6 +648,7 @@
 
 bool SFTPClient::createDirectoryHierarchy(const std::string& path) {
   if (path.empty()) {
+    last_error_ = LIBSSH2_FX_OK;
     return false;
   }
   bool absolute = path[0] == '/';
@@ -552,7 +666,8 @@
       if (err != LIBSSH2_FX_FILE_ALREADY_EXISTS &&
           err != LIBSSH2_FX_FAILURE &&
           err != LIBSSH2_FX_PERMISSION_DENIED) {
-        logger_->log_error("Failed to create remote directory \"%s\", error: %s", current_dir.c_str(), sftp_strerror(err));
+        last_error_ = err;
+        logger_->log_error("Failed to create remote directory \"%s\", error: %s", current_dir.c_str(), sftp_strerror(last_error_));
         return false;
       } else {
         logger_->log_debug("Non-fatal failure to create remote directory \"%s\", error: %s", current_dir.c_str(), sftp_strerror(err));
@@ -564,7 +679,8 @@
 
 bool SFTPClient::removeFile(const std::string& path) {
   if (libssh2_sftp_unlink(sftp_session_, path.c_str()) != 0) {
-    logger_->log_error("Failed to remove remote file \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+    last_error_ = libssh2_sftp_last_error(sftp_session_);
+    logger_->log_error("Failed to remove remote file \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
     return false;
   }
   return true;
@@ -572,7 +688,8 @@
 
 bool SFTPClient::removeDirectory(const std::string& path) {
   if (libssh2_sftp_rmdir(sftp_session_, path.c_str()) != 0) {
-    logger_->log_error("Failed to remove remote directory \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+    last_error_ = libssh2_sftp_last_error(sftp_session_);
+    logger_->log_error("Failed to remove remote directory \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
     return false;
   }
   return true;
@@ -587,7 +704,8 @@
                                                           0 /* mode */,
                                                           LIBSSH2_SFTP_OPENDIR);
   if (dir_handle == nullptr) {
-    logger_->log_error("Failed to open remote directory \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+    last_error_ = libssh2_sftp_last_error(sftp_session_);
+    logger_->log_error("Failed to open remote directory \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
     return false;
   }
   utils::ScopeGuard guard([&dir_handle]() {
@@ -605,16 +723,17 @@
                                       longentry.size(),
                                       &attrs);
     if (ret < 0) {
-      logger_->log_error("Failed to read remote directory \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+      last_error_ = libssh2_sftp_last_error(sftp_session_);
+      logger_->log_error("Failed to read remote directory \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
       return false;
     } else if (ret == 0) {
       break;
     }
     if (follow_symlinks && attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS && LIBSSH2_SFTP_S_ISLNK(attrs.permissions)) {
+      std::stringstream new_path;
+      new_path << path << "/" << filename.data();
       auto orig_attrs = attrs;
-      bool file_not_exists;
-      if (!this->stat(path, true /*follow_symlinks*/, attrs, file_not_exists)) {
-        logger_->log_debug("Failed to stat directory child \"%s/%s\", error: %s", path.c_str(), filename.data(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+      if (!this->stat(new_path.str(), true /*follow_symlinks*/, attrs)) {
         attrs = orig_attrs;
       }
     }
@@ -623,18 +742,14 @@
   return true;
 }
 
-bool SFTPClient::stat(const std::string& path, bool follow_symlinks, LIBSSH2_SFTP_ATTRIBUTES& result, bool& file_not_exists) {
-  file_not_exists = false;
+bool SFTPClient::stat(const std::string& path, bool follow_symlinks, LIBSSH2_SFTP_ATTRIBUTES& result) {
   if (libssh2_sftp_stat_ex(sftp_session_,
                             path.c_str(),
                             path.length(),
                             follow_symlinks ? LIBSSH2_SFTP_STAT : LIBSSH2_SFTP_LSTAT,
                             &result) != 0) {
-    auto error = libssh2_sftp_last_error(sftp_session_);
-    if (error == LIBSSH2_FX_NO_SUCH_FILE) {
-      file_not_exists = true;
-    }
-    logger_->log_debug("Failed to stat remote path \"%s\", error: %s", path.c_str(), sftp_strerror(error));
+    last_error_ = libssh2_sftp_last_error(sftp_session_);
+    logger_->log_debug("Failed to stat remote path \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
     return false;
   }
   return true;
@@ -646,8 +761,7 @@
   if ((!!(input.flags & SFTP_ATTRIBUTE_UID) != !!(input.flags & SFTP_ATTRIBUTE_GID)) ||
       (!!(input.flags & SFTP_ATTRIBUTE_MTIME) != !!(input.flags & SFTP_ATTRIBUTE_ATIME))) {
     /* Because we can only set these attributes in pairs, we must stat first to learn the other */
-    bool file_not_exists;
-    if (!this->stat(path, false /*follow_symlinks*/, attrs, file_not_exists)) {
+    if (!this->stat(path, false /*follow_symlinks*/, attrs)) {
       return false;
     }
   }
@@ -678,7 +792,8 @@
                            path.length(),
                            LIBSSH2_SFTP_SETSTAT,
                            &attrs) != 0) {
-    logger_->log_debug("Failed to setstat on remote path \"%s\", error: %s", path.c_str(), sftp_strerror(libssh2_sftp_last_error(sftp_session_)));
+    last_error_ = libssh2_sftp_last_error(sftp_session_);
+    logger_->log_debug("Failed to setstat on remote path \"%s\", error: %s", path.c_str(), sftp_strerror(last_error_));
     return false;
   }
 
diff --git a/extensions/sftp/client/SFTPClient.h b/extensions/sftp/client/SFTPClient.h
index 4d6ce49..8e6c234 100644
--- a/extensions/sftp/client/SFTPClient.h
+++ b/extensions/sftp/client/SFTPClient.h
@@ -40,6 +40,36 @@
 namespace minifi {
 namespace utils {
 
+enum class SFTPError : uint8_t {
+  SFTP_ERROR_OK = 0,
+  SFTP_ERROR_PERMISSION_DENIED,
+  SFTP_ERROR_FILE_NOT_EXISTS,
+  SFTP_ERROR_FILE_ALREADY_EXISTS,
+  SFTP_ERROR_COMMUNICATIONS_FAILURE,
+  SFTP_ERROR_IO_ERROR,
+  SFTP_ERROR_UNEXPECTED
+};
+
+class LastSFTPError {
+ public:
+  LastSFTPError();
+
+  LastSFTPError(const LastSFTPError&) = delete;
+  LastSFTPError(LastSFTPError&&) = delete;
+  LastSFTPError& operator=(const LastSFTPError&) = delete;
+  LastSFTPError& operator=(LastSFTPError&&) = delete;
+
+  LastSFTPError& operator=(unsigned long libssh2_sftp_error);
+  LastSFTPError& operator=(const SFTPError& sftp_error);
+  operator unsigned long() const;
+  operator SFTPError() const;
+
+ private:
+  bool sftp_error_set_;
+  unsigned long libssh2_sftp_error_;
+  SFTPError sftp_error_;
+};
+
 class SFTPClient {
  public:
 
@@ -77,6 +107,13 @@
 
   bool sendKeepAliveIfNeeded(int &seconds_to_next);
 
+  /**
+   * If any function below this returns false, this function provides the last SFTP-related error.
+   * If a function did not fail because of an SFTP-related error, this function will return SFTP_ERROR_OK.
+   * If this function is called after a function returns true, the return value is UNDEFINED.
+   */
+  SFTPError getLastError() const;
+
   bool getFile(const std::string& path, io::BaseStream& output, int64_t expected_size = -1);
 
   bool putFile(const std::string& path, io::BaseStream& input, bool overwrite, int64_t expected_size = -1);
@@ -92,7 +129,7 @@
   bool listDirectory(const std::string& path, bool follow_symlinks,
       std::vector<std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>>& children_result);
 
-  bool stat(const std::string& path, bool follow_symlinks, LIBSSH2_SFTP_ATTRIBUTES& result, bool& file_not_exists);
+  bool stat(const std::string& path, bool follow_symlinks, LIBSSH2_SFTP_ATTRIBUTES& result);
 
   static const uint32_t SFTP_ATTRIBUTE_PERMISSIONS = 0x00000001;
   static const uint32_t SFTP_ATTRIBUTE_UID         = 0x00000002;
@@ -148,6 +185,7 @@
 
   bool connected_;
 
+  LastSFTPError last_error_;
 };
 
 } /* namespace utils */
diff --git a/extensions/sftp/processors/FetchSFTP.cpp b/extensions/sftp/processors/FetchSFTP.cpp
new file mode 100644
index 0000000..3bc156c
--- /dev/null
+++ b/extensions/sftp/processors/FetchSFTP.cpp
@@ -0,0 +1,273 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FetchSFTP.h"
+
+#include <memory>
+#include <algorithm>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <iterator>
+#include <limits>
+#include <set>
+#include <string>
+#include <utility>
+
+#include "utils/ByteArrayCallback.h"
+#include "core/FlowFile.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/Relationship.h"
+#include "io/DataStream.h"
+#include "io/StreamFactory.h"
+#include "ResourceClaim.h"
+#include "utils/StringUtils.h"
+#include "utils/file/FileUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property FetchSFTP::RemoteFile(
+    core::PropertyBuilder::createProperty("Remote File")->withDescription("The fully qualified filename on the remote system")
+        ->isRequired(true)->supportsExpressionLanguage(true)->build());
+core::Property FetchSFTP::CompletionStrategy(
+    core::PropertyBuilder::createProperty("Completion Strategy")->withDescription("Specifies what to do with the original file on the server once it has been pulled into NiFi. If the Completion Strategy fails, a warning will be logged but the data will still be transferred.")
+        ->isRequired(true)
+        ->withAllowableValues<std::string>({COMPLETION_STRATEGY_NONE,
+                                            COMPLETION_STRATEGY_MOVE_FILE,
+                                            COMPLETION_STRATEGY_DELETE_FILE})
+        ->withDefaultValue(COMPLETION_STRATEGY_NONE)->build());
+core::Property FetchSFTP::MoveDestinationDirectory(
+    core::PropertyBuilder::createProperty("Move Destination Directory")->withDescription("The directory on the remote server to move the original file to once it has been ingested into NiFi. "
+                                                                                         "This property is ignored unless the Completion Strategy is set to 'Move File'. "
+                                                                                         "The specified directory must already exist on the remote system if 'Create Directory' is disabled, or the rename will fail.")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property FetchSFTP::CreateDirectory(
+    core::PropertyBuilder::createProperty("Create Directory")->withDescription("Specifies whether or not the remote directory should be created if it does not exist.")
+        ->isRequired(true)->withDefaultValue<bool>(false)->build());
+core::Property FetchSFTP::DisableDirectoryListing(
+    core::PropertyBuilder::createProperty("Disable Directory Listing")->withDescription("Control how 'Move Destination Directory' is created when 'Completion Strategy' is 'Move File' and 'Create Directory' is enabled. "
+                                                                                        "If set to 'true', directory listing is not performed prior to create missing directories. "
+                                                                                        "By default, this processor executes a directory listing command to see target directory existence before creating missing directories. "
+                                                                                        "However, there are situations that you might need to disable the directory listing such as the following. "
+                                                                                        "Directory listing might fail with some permission setups (e.g. chmod 100) on a directory. "
+                                                                                        "Also, if any other SFTP client created the directory after this processor performed a listing and before a directory creation request by this processor is finished, "
+                                                                                        "then an error is returned because the directory already exists.")
+        ->isRequired(false)->withDefaultValue<bool>(false)->build());
+core::Property FetchSFTP::UseCompression(
+    core::PropertyBuilder::createProperty("Use Compression")->withDescription("Indicates whether or not ZLIB compression should be used when transferring files")
+        ->isRequired(true)->withDefaultValue<bool>(false)->build());
+
+core::Relationship FetchSFTP::Success("success", "All FlowFiles that are received are routed to success");
+core::Relationship FetchSFTP::CommsFailure("comms.failure", "Any FlowFile that could not be fetched from the remote server due to a communications failure will be transferred to this Relationship.");
+core::Relationship FetchSFTP::NotFound("not.found", "Any FlowFile for which we receive a 'Not Found' message from the remote server will be transferred to this Relationship.");
+core::Relationship FetchSFTP::PermissionDenied("permission.denied", "Any FlowFile that could not be fetched from the remote server due to insufficient permissions will be transferred to this Relationship.");
+
+void FetchSFTP::initialize() {
+  logger_->log_trace("Initializing FetchSFTP");
+
+  // Set the supported properties
+  std::set<core::Property> properties;
+  addSupportedCommonProperties(properties);
+  properties.insert(RemoteFile);
+  properties.insert(CompletionStrategy);
+  properties.insert(MoveDestinationDirectory);
+  properties.insert(CreateDirectory);
+  properties.insert(DisableDirectoryListing);
+  properties.insert(UseCompression);
+  setSupportedProperties(properties);
+
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  relationships.insert(CommsFailure);
+  relationships.insert(NotFound);
+  relationships.insert(PermissionDenied);
+  setSupportedRelationships(relationships);
+}
+
+FetchSFTP::FetchSFTP(std::string name, utils::Identifier uuid /*= utils::Identifier()*/)
+    : SFTPProcessorBase(name, uuid),
+      create_directory_(false),
+      disable_directory_listing_(false) {
+  logger_ = logging::LoggerFactory<FetchSFTP>::getLogger();
+}
+
+FetchSFTP::~FetchSFTP() {
+}
+
+void FetchSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  parseCommonPropertiesOnSchedule(context);
+
+  std::string value;
+  context->getProperty(CompletionStrategy.getName(), completion_strategy_);
+  if (!context->getProperty(CreateDirectory.getName(), value)) {
+    logger_->log_error("Create Directory attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, create_directory_);
+  }
+  if (!context->getProperty(DisableDirectoryListing.getName(), value)) {
+    logger_->log_error("Disable Directory Listing attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, disable_directory_listing_);
+  }
+  if (!context->getProperty(UseCompression.getName(), value)) {
+    logger_->log_error("Use Compression attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, use_compression_);
+  }
+
+  startKeepaliveThreadIfNeeded();
+}
+
+FetchSFTP::WriteCallback::WriteCallback(const std::string& remote_file,
+                                    utils::SFTPClient& client)
+    : logger_(logging::LoggerFactory<FetchSFTP::WriteCallback>::getLogger())
+    , remote_file_(remote_file)
+    , client_(client) {
+}
+
+FetchSFTP::WriteCallback::~WriteCallback() {
+}
+
+int64_t FetchSFTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+  if (!client_.getFile(remote_file_, *stream)) {
+    throw client_.getLastError();
+  }
+  return stream->getSize();
+}
+
+void FetchSFTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+  std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord>(session->get());
+  if (flow_file == nullptr) {
+    return;
+  }
+
+  /* Parse common properties */
+  SFTPProcessorBase::CommonProperties common_properties;
+  if (!parseCommonPropertiesOnTrigger(context, flow_file, common_properties)) {
+    context->yield();
+    return;
+  }
+
+  /* Parse processor-specific properties */
+  std::string remote_file;
+  std::string move_destination_directory;
+
+  context->getProperty(RemoteFile, remote_file, flow_file);
+  context->getProperty(MoveDestinationDirectory, move_destination_directory, flow_file);
+
+  /* Get SFTPClient from cache or create it */
+  const SFTPProcessorBase::ConnectionCacheKey connection_cache_key = {common_properties.hostname,
+                                                                      common_properties.port,
+                                                                      common_properties.username,
+                                                                      proxy_type_,
+                                                                      common_properties.proxy_host,
+                                                                      common_properties.proxy_port,
+                                                                      common_properties.proxy_username};
+  auto client = getOrCreateConnection(connection_cache_key,
+                                      common_properties.password,
+                                      common_properties.private_key_path,
+                                      common_properties.private_key_passphrase,
+                                      common_properties.proxy_password);
+  if (client == nullptr) {
+    context->yield();
+    return;
+  }
+
+  /*
+   * Unless we're sure that the connection is good, we don't want to put it back to the cache.
+   * So we will only call this when we're sure that the connection is OK.
+   */
+  auto put_connection_back_to_cache = [this, &connection_cache_key, &client]() {
+    addConnectionToCache(connection_cache_key, std::move(client));
+  };
+
+  /* Download file */
+  WriteCallback write_callback(remote_file, *client);
+  try {
+    session->write(flow_file, &write_callback);
+  } catch (const utils::SFTPError& error) {
+    switch (error) {
+      case utils::SFTPError::SFTP_ERROR_PERMISSION_DENIED:
+        session->transfer(flow_file, PermissionDenied);
+        put_connection_back_to_cache();
+        return;
+      case utils::SFTPError::SFTP_ERROR_FILE_NOT_EXISTS:
+        session->transfer(flow_file, NotFound);
+        put_connection_back_to_cache();
+        return;
+      case utils::SFTPError::SFTP_ERROR_COMMUNICATIONS_FAILURE:
+      case utils::SFTPError::SFTP_ERROR_IO_ERROR:
+        session->transfer(flow_file, CommsFailure);
+        return;
+      default:
+        session->transfer(flow_file, PermissionDenied);
+        return;
+    }
+  }
+
+  /* Set attributes */
+  std::string parent_path;
+  std::string child_path;
+  std::tie(parent_path, child_path) = utils::file::FileUtils::split_path(remote_file, true /*force_posix*/);
+
+  session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_HOST, common_properties.hostname);
+  session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_PORT, std::to_string(common_properties.port));
+  session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_FILENAME, remote_file);
+  flow_file->updateKeyedAttribute(FILENAME, child_path);
+  if (!parent_path.empty()) {
+    flow_file->updateKeyedAttribute(PATH, parent_path);
+  }
+
+  /* Execute completion strategy */
+  if (completion_strategy_ == COMPLETION_STRATEGY_DELETE_FILE) {
+    if (!client->removeFile(remote_file)) {
+      logger_->log_warn("Completion Strategy is Delete File, but failed to delete remote file \"%s\"", remote_file);
+    }
+  } else if (completion_strategy_ == COMPLETION_STRATEGY_MOVE_FILE) {
+    bool should_move = true;
+    if (create_directory_) {
+      auto res = createDirectoryHierarchy(*client, move_destination_directory, disable_directory_listing_);
+      if (res != SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_OK) {
+        should_move = false;
+      }
+    }
+    if (!should_move) {
+      logger_->log_warn("Completion Strategy is Move File, but failed to create Move Destination Directory \"%s\"", move_destination_directory);
+    } else {
+      auto target_path = utils::file::FileUtils::concat_path(move_destination_directory, child_path);
+      if (!client->rename(remote_file, target_path, false /*overwrite*/)) {
+        logger_->log_warn("Completion Strategy is Move File, but failed to move file \"%s\" to \"%s\"", remote_file, target_path);
+      }
+    }
+  }
+
+  session->transfer(flow_file, Success);
+  put_connection_back_to_cache();
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sftp/processors/FetchSFTP.h b/extensions/sftp/processors/FetchSFTP.h
new file mode 100644
index 0000000..749b12e
--- /dev/null
+++ b/extensions/sftp/processors/FetchSFTP.h
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __FETCH_SFTP_H__
+#define __FETCH_SFTP_H__
+
+#include <memory>
+#include <string>
+
+#include "SFTPProcessorBase.h"
+#include "utils/ByteArrayCallback.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+#include "../client/SFTPClient.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class FetchSFTP : public SFTPProcessorBase {
+ public:
+
+  static constexpr char const *COMPLETION_STRATEGY_NONE = "None";
+  static constexpr char const *COMPLETION_STRATEGY_MOVE_FILE = "Move File";
+  static constexpr char const *COMPLETION_STRATEGY_DELETE_FILE = "Delete File";
+
+  static constexpr char const* ProcessorName = "FetchSFTP";
+
+
+  /*!
+   * Create a new processor
+   */
+  FetchSFTP(std::string name, utils::Identifier uuid = utils::Identifier());
+  virtual ~FetchSFTP();
+
+  // Supported Properties
+  static core::Property RemoteFile;
+  static core::Property CompletionStrategy;
+  static core::Property MoveDestinationDirectory;
+  static core::Property CreateDirectory;
+  static core::Property DisableDirectoryListing;
+  static core::Property UseCompression;
+
+  // Supported Relationships
+  static core::Relationship Success;
+  static core::Relationship CommsFailure;
+  static core::Relationship NotFound;
+  static core::Relationship PermissionDenied;
+
+  // Writes Attributes
+  static constexpr char const* ATTRIBUTE_SFTP_REMOTE_HOST = "sftp.remote.host";
+  static constexpr char const* ATTRIBUTE_SFTP_REMOTE_PORT= "sftp.remote.port";
+  static constexpr char const* ATTRIBUTE_SFTP_REMOTE_FILENAME = "sftp.remote.filename";
+
+  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+  virtual void initialize() override;
+  virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+
+  class WriteCallback : public OutputStreamCallback {
+   public:
+    WriteCallback(const std::string& remote_file,
+                 utils::SFTPClient& client);
+    ~WriteCallback();
+    virtual int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+
+   private:
+    std::shared_ptr<logging::Logger> logger_;
+    const std::string remote_file_;
+    utils::SFTPClient& client_;
+  };
+
+ private:
+
+  std::string completion_strategy_;
+  bool create_directory_;
+  bool disable_directory_listing_;
+};
+
+REGISTER_RESOURCE(FetchSFTP, "Fetches the content of a file from a remote SFTP server and overwrites the contents of an incoming FlowFile with the content of the remote file.")
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
diff --git a/extensions/sftp/processors/ListSFTP.cpp b/extensions/sftp/processors/ListSFTP.cpp
new file mode 100644
index 0000000..e4b56be
--- /dev/null
+++ b/extensions/sftp/processors/ListSFTP.cpp
@@ -0,0 +1,1173 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListSFTP.h"
+
+#include <memory>
+#include <algorithm>
+#include <cctype>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <iterator>
+#include <limits>
+#include <map>
+#include <set>
+#include <list>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "utils/ByteArrayCallback.h"
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "utils/ScopeGuard.h"
+#include "utils/file/FileUtils.h"
+#include "core/FlowFile.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/Relationship.h"
+#include "io/DataStream.h"
+#include "io/StreamFactory.h"
+#include "ResourceClaim.h"
+
+#include "rapidjson/document.h"
+#include "rapidjson/ostreamwrapper.h"
+#include "rapidjson/istreamwrapper.h"
+#include "rapidjson/writer.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property ListSFTP::ListingStrategy(
+    core::PropertyBuilder::createProperty("Listing Strategy")->withDescription("Specify how to determine new/updated entities. See each strategy descriptions for detail.")
+        ->isRequired(true)
+        ->withAllowableValues<std::string>({LISTING_STRATEGY_TRACKING_TIMESTAMPS,
+                                            LISTING_STRATEGY_TRACKING_ENTITIES})
+        ->withDefaultValue(LISTING_STRATEGY_TRACKING_TIMESTAMPS)->build());
+core::Property ListSFTP::RemotePath(
+    core::PropertyBuilder::createProperty("Remote Path")->withDescription("The fully qualified filename on the remote system")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property ListSFTP::SearchRecursively(
+    core::PropertyBuilder::createProperty("Search Recursively")->withDescription("If true, will pull files from arbitrarily nested subdirectories; "
+                                                                                 "otherwise, will not traverse subdirectories")
+        ->isRequired(true)->withDefaultValue<bool>(false)->build());
+core::Property ListSFTP::FollowSymlink(
+    core::PropertyBuilder::createProperty("Follow symlink")->withDescription("If true, will pull even symbolic files and also nested symbolic subdirectories; "
+                                                                             "otherwise, will not read symbolic files and will not traverse symbolic link subdirectories")
+        ->isRequired(true)->withDefaultValue<bool>(false)->build());
+core::Property ListSFTP::FileFilterRegex(
+    core::PropertyBuilder::createProperty("File Filter Regex")->withDescription("Provides a Java Regular Expression for filtering Filenames; "
+                                                                                "if a filter is supplied, only files whose names match that Regular Expression will be fetched")
+        ->isRequired(false)->build());
+core::Property ListSFTP::PathFilterRegex(
+    core::PropertyBuilder::createProperty("Path Filter Regex")->withDescription("When Search Recursively is true, then only subdirectories whose path matches the given Regular Expression will be scanned")
+        ->isRequired(false)->build());
+core::Property ListSFTP::IgnoreDottedFiles(
+    core::PropertyBuilder::createProperty("Ignore Dotted Files")->withDescription("If true, files whose names begin with a dot (\".\") will be ignored")
+        ->isRequired(true)->withDefaultValue<bool>(true)->build());
+core::Property ListSFTP::TargetSystemTimestampPrecision(
+    core::PropertyBuilder::createProperty("Target System Timestamp Precision")->withDescription("Specify timestamp precision at the target system. "
+                                                                                                "Since this processor uses timestamp of entities to decide which should be listed, "
+                                                                                                "it is crucial to use the right timestamp precision.")
+        ->isRequired(true)
+        ->withAllowableValues<std::string>({TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT,
+                                            TARGET_SYSTEM_TIMESTAMP_PRECISION_MILLISECONDS,
+                                            TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS,
+                                            TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES})
+        ->withDefaultValue(TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT)->build());
+core::Property ListSFTP::EntityTrackingTimeWindow(
+    core::PropertyBuilder::createProperty("Entity Tracking Time Window")->withDescription("Specify how long this processor should track already-listed entities. "
+                                                                                          "'Tracking Entities' strategy can pick any entity whose timestamp is inside the specified time window. "
+                                                                                          "For example, if set to '30 minutes', any entity having timestamp in recent 30 minutes will be the listing target when this processor runs. "
+                                                                                          "A listed entity is considered 'new/updated' and a FlowFile is emitted if one of following condition meets: "
+                                                                                          "1. does not exist in the already-listed entities, "
+                                                                                          "2. has newer timestamp than the cached entity, "
+                                                                                          "3. has different size than the cached entity. "
+                                                                                          "If a cached entity's timestamp becomes older than specified time window, that entity will be removed from the cached already-listed entities. "
+                                                                                          "Used by 'Tracking Entities' strategy.")
+        ->isRequired(false)->build());
+core::Property ListSFTP::EntityTrackingInitialListingTarget(
+    core::PropertyBuilder::createProperty("Entity Tracking Initial Listing Target")->withDescription("Specify how initial listing should be handled. Used by 'Tracking Entities' strategy.")
+        ->withAllowableValues<std::string>({ENTITY_TRACKING_INITIAL_LISTING_TARGET_TRACKING_TIME_WINDOW,
+                                            ENTITY_TRACKING_INITIAL_LISTING_TARGET_ALL_AVAILABLE})
+        ->isRequired(false)->withDefaultValue(ENTITY_TRACKING_INITIAL_LISTING_TARGET_ALL_AVAILABLE)->build());
+core::Property ListSFTP::MinimumFileAge(
+    core::PropertyBuilder::createProperty("Minimum File Age")->withDescription("The minimum age that a file must be in order to be pulled; "
+                                                                               "any file younger than this amount of time (according to last modification date) will be ignored")
+        ->isRequired(true)->withDefaultValue<core::TimePeriodValue>("0 sec")->build());
+core::Property ListSFTP::MaximumFileAge(
+    core::PropertyBuilder::createProperty("Maximum File Age")->withDescription("The maximum age that a file must be in order to be pulled; "
+                                                                               "any file older than this amount of time (according to last modification date) will be ignored")
+        ->isRequired(false)->build());
+core::Property ListSFTP::MinimumFileSize(
+    core::PropertyBuilder::createProperty("Minimum File Size")->withDescription("The minimum size that a file must be in order to be pulled")
+        ->isRequired(true)->withDefaultValue<core::DataSizeValue>("0 B")->build());
+core::Property ListSFTP::MaximumFileSize(
+    core::PropertyBuilder::createProperty("Maximum File Size")->withDescription("The maximum size that a file must be in order to be pulled")
+        ->isRequired(false)->build());
+core::Property ListSFTP::StateFile(
+    core::PropertyBuilder::createProperty("State File")->withDescription("Specifies the file that should be used for storing state about"
+                                                                         " what data has been ingested so that upon restart MiNiFi can resume from where it left off")
+        ->isRequired(true)->withDefaultValue("ListSFTP")->build());
+
+core::Relationship ListSFTP::Success("success", "All FlowFiles that are received are routed to success");
+
+const std::map<std::string, uint64_t> ListSFTP::LISTING_LAG_MAP = {
+  {ListSFTP::TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS, 1000},
+  {ListSFTP::TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES, 60000},
+};
+
+void ListSFTP::initialize() {
+  logger_->log_trace("Initializing FetchSFTP");
+
+  // Set the supported properties
+  std::set<core::Property> properties;
+  addSupportedCommonProperties(properties);
+  properties.insert(ListingStrategy);
+  properties.insert(RemotePath);
+  properties.insert(SearchRecursively);
+  properties.insert(FollowSymlink);
+  properties.insert(FileFilterRegex);
+  properties.insert(PathFilterRegex);
+  properties.insert(IgnoreDottedFiles);
+  properties.insert(TargetSystemTimestampPrecision);
+  properties.insert(EntityTrackingTimeWindow);
+  properties.insert(EntityTrackingInitialListingTarget);
+  properties.insert(MinimumFileAge);
+  properties.insert(MaximumFileAge);
+  properties.insert(MinimumFileSize);
+  properties.insert(MaximumFileSize);
+  properties.insert(StateFile);
+  setSupportedProperties(properties);
+
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  setSupportedRelationships(relationships);
+}
+
+ListSFTP::ListSFTP(std::string name, utils::Identifier uuid /*= utils::Identifier()*/)
+    : SFTPProcessorBase(name, uuid)
+    , search_recursively_(false)
+    , follow_symlink_(false)
+    , file_filter_regex_set_(false)
+    , path_filter_regex_set_(false)
+    , ignore_dotted_files_(false)
+    , minimum_file_age_(0U)
+    , maximum_file_age_(0U)
+    , minimum_file_size_(0U)
+    , maximum_file_size_(0U)
+    , already_loaded_from_cache_(false)
+    , last_listed_latest_entry_timestamp_(0U)
+    , last_processed_latest_entry_timestamp_(0U)
+    , initial_listing_complete_(false) {
+  logger_ = logging::LoggerFactory<ListSFTP>::getLogger();
+}
+
+ListSFTP::~ListSFTP() {
+#ifndef WIN32
+  if (file_filter_regex_set_) {
+    regfree(&compiled_file_filter_regex_);
+  }
+  if (path_filter_regex_set_) {
+    regfree(&compiled_path_filter_regex_);
+  }
+#endif
+}
+
+void ListSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  parseCommonPropertiesOnSchedule(context);
+
+  std::string value;
+  context->getProperty(ListingStrategy.getName(), listing_strategy_);
+  if (!last_listing_strategy_.empty() && last_listing_strategy_ != listing_strategy_) {
+    invalidateCache();
+  }
+  last_listing_strategy_ = listing_strategy_;
+  if (!context->getProperty(SearchRecursively.getName(), value)) {
+    logger_->log_error("Search Recursively attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, search_recursively_);
+  }
+  if (!context->getProperty(FollowSymlink.getName(), value)) {
+    logger_->log_error("Follow symlink attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, follow_symlink_);
+  }
+  if (context->getProperty(FileFilterRegex.getName(), file_filter_regex_)) {
+#ifndef WIN32
+    if (file_filter_regex_set_) {
+      regfree(&compiled_file_filter_regex_);
+    }
+    int ret = regcomp(&compiled_file_filter_regex_, file_filter_regex_.c_str(), 0);
+    if (ret != 0) {
+      logger_->log_error("Failed to compile File Filter Regex \"%s\"", file_filter_regex_.c_str());
+      file_filter_regex_set_ = false;
+    } else {
+      file_filter_regex_set_ = true;
+    }
+#else
+    try {
+      compiled_file_filter_regex_ = std::regex(file_filter_regex_);
+      file_filter_regex_set_ = true;
+    } catch (std::regex_error&) {
+      logger_->log_error("Failed to compile File Filter Regex \"%s\"", file_filter_regex_.c_str());
+      file_filter_regex_set_ = false;
+    }
+#endif
+  } else {
+    file_filter_regex_set_ = false;
+  }
+  if (context->getProperty(PathFilterRegex.getName(), path_filter_regex_)) {
+#ifndef WIN32
+    if (path_filter_regex_set_) {
+      regfree(&compiled_path_filter_regex_);
+    }
+    int ret = regcomp(&compiled_path_filter_regex_, path_filter_regex_.c_str(), 0);
+    if (ret != 0) {
+      logger_->log_error("Failed to compile Path Filter Regex \"%s\"", path_filter_regex_.c_str());
+      file_filter_regex_set_ = false;
+    } else {
+      path_filter_regex_set_ = true;
+    }
+#else
+    try {
+      compiled_path_filter_regex_ = std::regex(path_filter_regex_);
+      path_filter_regex_set_ = true;
+    } catch (std::regex_error&) {
+      logger_->log_error("Failed to compile Path Filter Regex \"%s\"", path_filter_regex_.c_str());
+      path_filter_regex_set_ = false;
+    }
+#endif
+  } else {
+    path_filter_regex_set_ = false;
+  }
+  if (!context->getProperty(IgnoreDottedFiles.getName(), value)) {
+    logger_->log_error("Ignore Dotted Files attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, ignore_dotted_files_);
+  }
+  context->getProperty(TargetSystemTimestampPrecision.getName(), target_system_timestamp_precision_);
+  context->getProperty(EntityTrackingInitialListingTarget.getName(), entity_tracking_initial_listing_target_);
+  if (!context->getProperty(MinimumFileAge.getName(), value)) {
+    logger_->log_error("Minimum File Age attribute is missing or invalid");
+  } else {
+    core::TimeUnit unit;
+    if (!core::Property::StringToTime(value, minimum_file_age_, unit) || !core::Property::ConvertTimeUnitToMS(minimum_file_age_, unit, minimum_file_age_)) {
+      logger_->log_error("Minimum File Age attribute is invalid");
+    }
+  }
+  if (context->getProperty(MaximumFileAge.getName(), value)) {
+    core::TimeUnit unit;
+    if (!core::Property::StringToTime(value, maximum_file_age_, unit) || !core::Property::ConvertTimeUnitToMS(maximum_file_age_, unit, maximum_file_age_)) {
+      logger_->log_error("Maximum File Age attribute is invalid");
+    }
+  }
+  if (!context->getProperty(MinimumFileSize.getName(), minimum_file_size_)) {
+    logger_->log_error("Minimum File Size attribute is invalid");
+  }
+  if (context->getProperty(MaximumFileSize.getName(), value)) {
+    if (!core::DataSizeValue::StringToInt(value, maximum_file_size_)) {
+      logger_->log_error("Maximum File Size attribute is invalid");
+    }
+  }
+  context->getProperty(StateFile.getName(), value);
+  if (listing_strategy_ == LISTING_STRATEGY_TRACKING_TIMESTAMPS) {
+    std::stringstream ss;
+    ss << value << "." << getUUIDStr() << ".TrackingTimestamps";
+    auto new_tracking_timestamps_state_filename = ss.str();
+    if (new_tracking_timestamps_state_filename != tracking_timestamps_state_filename_) {
+      if (!tracking_timestamps_state_filename_.empty()) {
+        if (unlink(tracking_timestamps_state_filename_.c_str()) != 0) {
+          logger_->log_error("Unable to delete old Tracking Timestamps state file \"%s\"",
+                             tracking_timestamps_state_filename_.c_str());
+        }
+      }
+    }
+    tracking_timestamps_state_filename_ = new_tracking_timestamps_state_filename;
+  } else if (listing_strategy_ == LISTING_STRATEGY_TRACKING_ENTITIES) {
+    std::stringstream ss;
+    ss << value << "." << getUUIDStr() << ".TrackingEntities";
+    auto new_tracking_entities_state_filename = ss.str();
+    ss << ".json";
+    auto new_tracking_entities_state_json_filename = ss.str();
+    if (new_tracking_entities_state_filename != tracking_entities_state_filename_) {
+      if (!tracking_entities_state_filename_.empty()) {
+        if (unlink(tracking_entities_state_filename_.c_str()) != 0) {
+          logger_->log_error("Unable to delete old Tracking Entities state file \"%s\"",
+                             tracking_entities_state_filename_.c_str());
+        }
+      }
+      if (!tracking_entities_state_json_filename_.empty()) {
+        if (unlink(tracking_entities_state_json_filename_.c_str()) != 0) {
+          logger_->log_error("Unable to delete old Tracking Entities json state file \"%s\"",
+                             tracking_entities_state_json_filename_.c_str());
+        }
+      }
+    }
+    tracking_entities_state_filename_ = new_tracking_entities_state_filename;
+    tracking_entities_state_json_filename_ = new_tracking_entities_state_json_filename;
+  } else {
+    logger_->log_error("Unknown Listing Strategy: \"%s\"", listing_strategy_.c_str());
+  }
+
+  startKeepaliveThreadIfNeeded();
+}
+
+void ListSFTP::invalidateCache() {
+  logger_->log_warn("Important properties have been reconfigured, invalidating in-memory cache");
+
+  already_loaded_from_cache_ = false;
+
+  last_run_time_ = std::chrono::time_point<std::chrono::steady_clock>();
+  last_listed_latest_entry_timestamp_ = 0U;
+  last_processed_latest_entry_timestamp_ = 0U;
+  latest_identifiers_processed_.clear();
+
+  initial_listing_complete_ = false;
+  already_listed_entities_.clear();
+}
+
+ListSFTP::Child::Child()
+    :directory(false) {
+  memset(&attrs, 0x00, sizeof(attrs));
+}
+
+ListSFTP::Child::Child(const std::string& parent_path_, std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>&& sftp_child) {
+  parent_path = parent_path_;
+  std::tie(filename, std::ignore, attrs) = std::move(sftp_child);
+  directory = LIBSSH2_SFTP_S_ISDIR(attrs.permissions);
+}
+
+std::string ListSFTP::Child::getPath() const {
+  return utils::file::FileUtils::concat_path(parent_path, filename, true /*force_posix*/);
+}
+
+bool ListSFTP::filter(const std::string& parent_path, const std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>& sftp_child) {
+  const std::string& filename = std::get<0>(sftp_child);
+  const LIBSSH2_SFTP_ATTRIBUTES& attrs = std::get<2>(sftp_child);
+  /* This should not happen */
+  if (filename.empty()) {
+    logger_->log_error("Listing directory \"%s\" returned an empty child", parent_path.c_str());
+    return false;
+  }
+  /* Ignore current dir and parent dir */
+  if (filename == "." || filename == "..") {
+    return false;
+  }
+  /* Dotted files */
+  if (ignore_dotted_files_ && filename[0] == '.') {
+    logger_->log_debug("Ignoring \"%s/%s\" because Ignore Dotted Files is true", parent_path.c_str(), filename.c_str());
+    return false;
+  }
+  if (!(attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS)) {
+    // TODO: maybe do a fallback stat here
+    logger_->log_error("Failed to get permissions in stat for \"%s/%s\"", parent_path.c_str(), filename.c_str());
+    return false;
+  }
+  if (LIBSSH2_SFTP_S_ISREG(attrs.permissions)) {
+    return filterFile(parent_path, filename, attrs);
+  } else if (LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
+    return filterDirectory(parent_path, filename, attrs);
+  } else {
+    logger_->log_debug("Skipping non-regular, non-directory file \"%s/%s\"", parent_path.c_str(), filename.c_str());
+    return false;
+  }
+}
+
+bool ListSFTP::filterFile(const std::string& parent_path, const std::string& filename, const LIBSSH2_SFTP_ATTRIBUTES& attrs) {
+  if (!(attrs.flags & LIBSSH2_SFTP_ATTR_UIDGID) ||
+      !(attrs.flags & LIBSSH2_SFTP_ATTR_SIZE) ||
+      !(attrs.flags & LIBSSH2_SFTP_ATTR_ACMODTIME)) {
+    // TODO: maybe do a fallback stat here
+    logger_->log_error("Failed to get all attributes in stat for \"%s/%s\"", parent_path.c_str(), filename.c_str());
+    return false;
+  }
+
+  /* Age */
+  time_t now = time(nullptr);
+  int64_t file_age = (now - attrs.mtime) * 1000;
+  if (file_age < minimum_file_age_) {
+    logger_->log_debug("Ignoring \"%s/%s\" because it is younger than the Minimum File Age: %ld ms < %lu ms",
+        parent_path.c_str(),
+        filename.c_str(),
+        file_age,
+        minimum_file_age_);
+    return false;
+  }
+  if (maximum_file_age_ != 0U && file_age > maximum_file_age_) {
+    logger_->log_debug("Ignoring \"%s/%s\" because it is older than the Maximum File Age: %ld ms > %lu ms",
+                       parent_path.c_str(),
+                       filename.c_str(),
+                       file_age,
+                       maximum_file_age_);
+    return false;
+  }
+
+  /* Size */
+  if (attrs.filesize < minimum_file_size_) {
+    logger_->log_debug("Ignoring \"%s/%s\" because it is smaller than the Minimum File Size: %lu B < %lu B",
+                       parent_path.c_str(),
+                       filename.c_str(),
+                       attrs.filesize,
+                       minimum_file_size_);
+    return false;
+  }
+  if (maximum_file_size_ != 0U && attrs.filesize > maximum_file_size_) {
+    logger_->log_debug("Ignoring \"%s/%s\" because it is larger than the Maximum File Size: %lu B > %lu B",
+                       parent_path.c_str(),
+                       filename.c_str(),
+                       attrs.filesize,
+                       maximum_file_size_);
+    return false;
+  }
+
+  /* File Filter Regex */
+  if (file_filter_regex_set_) {
+    bool match = false;
+#ifndef WIN32
+    int ret = regexec(&compiled_file_filter_regex_, filename.c_str(), static_cast<size_t>(0), nullptr, 0);
+    match = ret == 0;
+#else
+    match = std::regex_match(filename, compiled_file_filter_regex_);
+#endif
+    if (!match) {
+      logger_->log_debug("Ignoring \"%s/%s\" because it did not match the File Filter Regex \"%s\"",
+                         parent_path.c_str(),
+                         filename.c_str(),
+                         file_filter_regex_);
+      return false;
+    }
+  }
+
+  return true;
+}
+
+bool ListSFTP::filterDirectory(const std::string& parent_path, const std::string& filename, const LIBSSH2_SFTP_ATTRIBUTES& attrs) {
+  if (!search_recursively_) {
+    return false;
+  }
+
+  /* Path Filter Regex */
+  if (path_filter_regex_set_) {
+    std::string dir_path = utils::file::FileUtils::concat_path(parent_path, filename, true /*force_posix*/);
+    bool match = false;
+#ifndef WIN32
+    int ret = regexec(&compiled_path_filter_regex_, dir_path.c_str(), static_cast<size_t>(0), nullptr, 0);
+    match = ret == 0;
+#else
+    match = std::regex_match(dir_path, compiled_path_filter_regex_);
+#endif
+    if (!match) {
+      logger_->log_debug("Not recursing into \"%s\" because it did not match the Path Filter Regex \"%s\"",
+                         dir_path.c_str(),
+                         path_filter_regex_);
+      return false;
+    }
+  }
+
+  return true;
+}
+
+bool ListSFTP::createAndTransferFlowFileFromChild(
+    const std::shared_ptr<core::ProcessSession>& session,
+    const std::string& hostname,
+    uint16_t port,
+    const std::string& username,
+    const ListSFTP::Child& child) {
+  /* Convert mtime to string */
+  if (child.attrs.mtime > std::numeric_limits<int64_t>::max()) {
+    logger_->log_error("Modification date %lu of \"%s/%s\" larger than int64_t max", child.attrs.mtime, child.parent_path.c_str(), child.filename.c_str());
+    return true;
+  }
+  std::string mtime_str;
+  if (!getDateTimeStr(static_cast<int64_t>(child.attrs.mtime), mtime_str)) {
+    logger_->log_error("Failed to convert modification date %lu of \"%s/%s\" to string", child.attrs.mtime, child.parent_path.c_str(), child.filename.c_str());
+    return true;
+  }
+
+  /* Create FlowFile */
+  std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord>(session->create());
+  if (flow_file == nullptr) {
+    logger_->log_error("Failed to create FlowFileRecord");
+    return false;
+  }
+
+  /* Set attributes */
+  session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_HOST, hostname);
+  session->putAttribute(flow_file, ATTRIBUTE_SFTP_REMOTE_PORT, std::to_string(port));
+  session->putAttribute(flow_file, ATTRIBUTE_SFTP_LISTING_USER, username);
+
+  /* uid and gid */
+  session->putAttribute(flow_file, ATTRIBUTE_FILE_OWNER, std::to_string(child.attrs.uid));
+  session->putAttribute(flow_file, ATTRIBUTE_FILE_GROUP, std::to_string(child.attrs.gid));
+
+  /* permissions */
+  std::stringstream ss;
+  ss << std::setfill('0') << std::setw(4) << std::oct << (child.attrs.permissions & 0777);
+  session->putAttribute(flow_file, ATTRIBUTE_FILE_PERMISSIONS, ss.str());
+
+  /* filesize */
+  session->putAttribute(flow_file, ATTRIBUTE_FILE_SIZE, std::to_string(child.attrs.filesize));
+
+  /* mtime */
+  session->putAttribute(flow_file, ATTRIBUTE_FILE_LASTMODIFIEDTIME, mtime_str);
+
+  flow_file->updateKeyedAttribute(FILENAME, child.filename);
+  flow_file->updateKeyedAttribute(PATH, child.parent_path);
+
+  session->transfer(flow_file, Success);
+
+  return true;
+}
+
+ListSFTP::ListedEntity::ListedEntity()
+    : timestamp(0U)
+    , size(0U) {
+}
+
+ListSFTP::ListedEntity::ListedEntity(uint64_t timestamp_, uint64_t size_)
+    : timestamp(timestamp_)
+    , size(size_) {
+}
+
+bool ListSFTP::persistTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path) {
+  std::ofstream file(tracking_timestamps_state_filename_);
+  if (!file.is_open()) {
+    logger_->log_error("Failed to store state to Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+    return false;
+  }
+  file << "hostname=" << hostname << "\n";
+  file << "username=" << username << "\n";
+  file << "remote_path=" << remote_path << "\n";
+  file << "listing.timestamp=" << last_listed_latest_entry_timestamp_ << "\n";
+  file << "processed.timestamp=" << last_processed_latest_entry_timestamp_ << "\n";
+  size_t i = 0;
+  for (const auto& identifier : latest_identifiers_processed_) {
+    file << "id." << i << "=" << identifier << "\n";
+    ++i;
+  }
+  return true;
+}
+
+bool ListSFTP::updateFromTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path) {
+  std::ifstream file(tracking_timestamps_state_filename_);
+  if (!file.is_open()) {
+    logger_->log_error("Failed to open Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+    return false;
+  }
+  std::string state_hostname;
+  std::string state_username;
+  std::string state_remote_path;
+  uint64_t state_listing_timestamp;
+  uint64_t state_processed_timestamp;
+  std::set<std::string> state_ids;
+
+  std::string line;
+  while (std::getline(file, line)) {
+    size_t separator_pos = line.find('=');
+    if (separator_pos == std::string::npos) {
+      logger_->log_warn("None key-value line found in Tracking Timestamps state file \"%s\": \"%s\"", tracking_timestamps_state_filename_.c_str(), line.c_str());
+    }
+    std::string key = line.substr(0, separator_pos);
+    std::string value = line.substr(separator_pos + 1);
+    if (key == "hostname") {
+      state_hostname = std::move(value);
+    } else if (key == "username") {
+      state_username = std::move(value);
+    } else if (key == "remote_path") {
+      state_remote_path = std::move(value);
+    } else if (key == "listing.timestamp") {
+      try {
+        state_listing_timestamp = stoull(value);
+      } catch (...) {
+        logger_->log_error("listing.timestamp is not an uint64 in Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+        return false;
+      }
+    } else if (key == "processed.timestamp") {
+      try {
+        state_processed_timestamp = stoull(value);
+      } catch (...) {
+        logger_->log_error("processed.timestamp is not an uint64 in Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+        return false;
+      }
+    } else if (key.compare(0, strlen("id."), "id.") == 0) {
+      state_ids.emplace(std::move(value));
+    } else {
+      logger_->log_warn("Unknown key found in Tracking Timestamps state file \"%s\": \"%s\"", tracking_timestamps_state_filename_.c_str(), key.c_str());
+    }
+  }
+  file.close();
+
+  if (state_hostname != hostname ||
+      state_username != username ||
+      state_remote_path != remote_path) {
+    logger_->log_error("Tracking Timestamps state file \"%s\" was created with different settings than the current ones, ignoring. "
+                       "Hostname: \"%s\" vs. \"%s\", "
+                       "Username: \"%s\" vs. \"%s\", "
+                       "Remote Path: \"%s\" vs. \"%s\"",
+                       tracking_timestamps_state_filename_.c_str(),
+                       state_hostname, hostname,
+                       state_username, username,
+                       state_remote_path, remote_path);
+    return false;
+  }
+
+  last_listed_latest_entry_timestamp_ = state_listing_timestamp;
+  last_processed_latest_entry_timestamp_ = state_processed_timestamp;
+  latest_identifiers_processed_ = std::move(state_ids);
+
+  return true;
+}
+
+void ListSFTP::listByTrackingTimestamps(
+    const std::shared_ptr<core::ProcessContext>& context,
+    const std::shared_ptr<core::ProcessSession>& session,
+    const std::string& hostname,
+    uint16_t port,
+    const std::string& username,
+    const std::string& remote_path,
+    std::vector<Child>&& files) {
+  uint64_t min_timestamp_to_list = last_listed_latest_entry_timestamp_;
+
+  /* Load state from cache file if needed */
+  if (!already_loaded_from_cache_ && !tracking_timestamps_state_filename_.empty()) {
+    if (updateFromTrackingTimestampsCache(hostname, username, remote_path)) {
+      logger_->log_debug("Successfully loaded Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+    } else {
+      logger_->log_debug("Failed to load Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+    }
+    already_loaded_from_cache_ = true;
+  }
+
+  std::chrono::time_point<std::chrono::steady_clock> current_run_time = std::chrono::steady_clock::now();
+  time_t now = time(nullptr);
+
+  /* Order children by timestamp and try to detect timestamp precision if needed  */
+  std::map<uint64_t /*timestamp*/, std::list<Child>> ordered_files;
+  bool target_system_has_seconds = false;
+  for (auto&& file : files) {
+    uint64_t timestamp = file.attrs.mtime * 1000;
+    target_system_has_seconds |= timestamp % 60000 != 0;
+
+    bool new_file = min_timestamp_to_list == 0U || (timestamp >= min_timestamp_to_list && timestamp >= last_processed_latest_entry_timestamp_);
+    if (new_file) {
+      auto& files_for_timestamp = ordered_files[timestamp];
+      files_for_timestamp.emplace_back(std::move(file));
+    } else {
+      logger_->log_trace("Skipping \"%s\", because it is not new.", file.getPath().c_str());
+    }
+  }
+
+  uint64_t latest_listed_entry_timestamp_this_cycle = 0U;
+  size_t flow_files_created = 0U;
+  if (ordered_files.size() > 0) {
+    latest_listed_entry_timestamp_this_cycle = ordered_files.crbegin()->first;
+
+    std::string remote_system_timestamp_precision;
+    if (target_system_timestamp_precision_ == TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT) {
+      if (target_system_has_seconds) {
+        logger_->log_debug("Precision auto detection detected second precision");
+        remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS;
+      } else {
+        logger_->log_debug("Precision auto detection detected minute precision");
+        remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES;
+      }
+    } else if (target_system_timestamp_precision_ == TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES) {
+        remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES;
+    } else {
+      /*
+       * We only have seconds-precision timestamps, TARGET_SYSTEM_TIMESTAMP_PRECISION_MILLISECONDS makes no real sense here,
+       * so we will treat it as TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS.
+       */
+      remote_system_timestamp_precision = TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS;
+    }
+    uint64_t listing_lag = LISTING_LAG_MAP.at(remote_system_timestamp_precision);
+    logger_->log_debug("The listing lag is %lu ms", listing_lag);
+
+    /* If the latest listing time is equal to the last listing time, there are no entries with a newer timestamp than previously seen */
+    if (latest_listed_entry_timestamp_this_cycle == last_listed_latest_entry_timestamp_) {
+      const auto& latest_files = ordered_files.at(latest_listed_entry_timestamp_this_cycle);
+      uint64_t elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(current_run_time - last_run_time_).count();
+      /* If a precision-specific listing lag has not yet elapsed since out last execution, we wait. */
+      if (elapsed_time < listing_lag) {
+        logger_->log_debug("The latest listed entry timestamp is the same as the last listed entry timestamp (%lu) "
+                           "and the listing lag has not yet elapsed (%lu ms < % lu ms). Yielding.",
+                           latest_listed_entry_timestamp_this_cycle,
+                           elapsed_time,
+                           listing_lag);
+        context->yield();
+        return;
+      }
+      /*
+       * If we have already processed the entities with the newest timestamp,
+       * and there are no new entities with that timestamp, there is nothing to do.
+       */
+      if (latest_listed_entry_timestamp_this_cycle == last_processed_latest_entry_timestamp_ &&
+          std::all_of(latest_files.begin(), latest_files.end(), [this](const Child& child) {
+            return latest_identifiers_processed_.count(child.getPath()) == 1U;
+          })) {
+        logger_->log_debug("The latest listed entry timestamp is the same as the last listed entry timestamp (%lu) "
+                           "and all files for that timestamp has been processed. Yielding.", latest_listed_entry_timestamp_this_cycle);
+        context->yield();
+        return;
+      }
+    } else {
+      /* Determine the minimum reliable timestamp based on precision */
+      uint64_t minimum_reliable_timestamp = now * 1000 - listing_lag;
+      if (remote_system_timestamp_precision == TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS) {
+        minimum_reliable_timestamp -= minimum_reliable_timestamp % 1000;
+      } else {
+        minimum_reliable_timestamp -= minimum_reliable_timestamp % 60000;
+      }
+      /* If the latest timestamp is not old enough, we wait another cycle */
+      if (minimum_reliable_timestamp < latest_listed_entry_timestamp_this_cycle) {
+        logger_->log_debug("Skipping files with latest timestamp because their modification date is not smaller than the minimum reliable timestamp: %lu ms >= %lu ms",
+            latest_listed_entry_timestamp_this_cycle,
+            minimum_reliable_timestamp);
+        ordered_files.erase(latest_listed_entry_timestamp_this_cycle);
+      }
+    }
+
+    for (auto& files_for_timestamp : ordered_files) {
+      if (files_for_timestamp.first == last_processed_latest_entry_timestamp_) {
+        /* Filter out previously processed entities. */
+        for (auto it = files_for_timestamp.second.begin(); it != files_for_timestamp.second.end();) {
+          if (latest_identifiers_processed_.count(it->getPath()) != 0U) {
+            it = files_for_timestamp.second.erase(it);
+          } else {
+            ++it;
+          }
+        }
+      }
+      for (const auto& file : files_for_timestamp.second) {
+        /* Create the FlowFile for this path */
+        if (createAndTransferFlowFileFromChild(session, hostname, port, username, file)) {
+          flow_files_created++;
+        } else {
+          logger_->log_error("Failed to emit FlowFile for \"%s\"", file.filename);
+          context->yield();
+          return;
+        }
+      }
+    }
+  }
+
+  /* If we have a listing timestamp, it is worth persisting the state */
+  if (latest_listed_entry_timestamp_this_cycle != 0U) {
+    bool processed_new_files = flow_files_created > 0U;
+    if (processed_new_files) {
+      auto last_files_it = ordered_files.crbegin();
+      if (last_files_it->first != last_processed_latest_entry_timestamp_) {
+        latest_identifiers_processed_.clear();
+      }
+
+      for (const auto& last_file : last_files_it->second) {
+        latest_identifiers_processed_.insert(last_file.getPath());
+      }
+
+      last_processed_latest_entry_timestamp_ = last_files_it->first;
+    }
+
+    last_run_time_ = current_run_time;
+
+    if (latest_listed_entry_timestamp_this_cycle != last_listed_latest_entry_timestamp_ || processed_new_files) {
+      last_listed_latest_entry_timestamp_ = latest_listed_entry_timestamp_this_cycle;
+      if (!tracking_timestamps_state_filename_.empty()) {
+        persistTrackingTimestampsCache(hostname, username, remote_path);
+      }
+    }
+  } else {
+    logger_->log_debug("There are no files to list. Yielding.");
+    context->yield();
+    return;
+  }
+}
+
+bool ListSFTP::persistTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path) {
+  std::ofstream file(tracking_entities_state_filename_);
+  if (!file.is_open()) {
+    logger_->log_error("Failed to store Tracking Entities state to state file \"%s\"", tracking_entities_state_filename_.c_str());
+    return false;
+  }
+  file << "hostname=" << hostname << "\n";
+  file << "username=" << username << "\n";
+  file << "remote_path=" << remote_path << "\n";
+  file << "json_state_file=" << tracking_entities_state_json_filename_ << "\n";
+  file.close();
+
+  std::ofstream json_file(tracking_entities_state_json_filename_);
+  if (!json_file.is_open()) {
+    logger_->log_error("Failed to store Tracking Entities state to state json file \"%s\"", tracking_entities_state_json_filename_.c_str());
+    return false;
+  }
+
+  rapidjson::Document entities(rapidjson::kObjectType);
+  rapidjson::Document::AllocatorType& alloc = entities.GetAllocator();
+  for (const auto& already_listed_entity : already_listed_entities_) {
+    rapidjson::Value entity(rapidjson::kObjectType);
+    entity.AddMember("timestamp", already_listed_entity.second.timestamp, alloc);
+    entity.AddMember("size", already_listed_entity.second.size, alloc);
+    entities.AddMember(rapidjson::Value(already_listed_entity.first.c_str(), alloc), std::move(entity), alloc);
+  }
+
+  rapidjson::OStreamWrapper osw(json_file);
+  rapidjson::Writer<rapidjson::OStreamWrapper> writer(osw);
+  entities.Accept(writer);
+
+  return true;
+}
+
+bool ListSFTP::updateFromTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path) {
+  std::ifstream file(tracking_entities_state_filename_);
+  if (!file.is_open()) {
+    logger_->log_error("Failed to open Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
+    return false;
+  }
+  std::string state_hostname;
+  std::string state_username;
+  std::string state_remote_path;
+  std::string state_json_state_file;
+
+  std::string line;
+  while (std::getline(file, line)) {
+    size_t separator_pos = line.find('=');
+    if (separator_pos == std::string::npos) {
+      logger_->log_warn("None key-value line found in Tracking Entities state file \"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), line.c_str());
+      continue;
+    }
+    std::string key = line.substr(0, separator_pos);
+    std::string value = line.substr(separator_pos + 1);
+    if (key == "hostname") {
+      state_hostname = std::move(value);
+    } else if (key == "username") {
+      state_username = std::move(value);
+    } else if (key == "remote_path") {
+      state_remote_path = std::move(value);
+    } else if (key == "json_state_file") {
+      state_json_state_file = std::move(value);
+    } else {
+      logger_->log_warn("Unknown key found in Tracking Entities state file \"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), key.c_str());
+    }
+  }
+  file.close();
+
+  if (state_hostname != hostname ||
+      state_username != username ||
+      state_remote_path != remote_path) {
+    logger_->log_error("Tracking Entities state file \"%s\" was created with different settings than the current ones, ignoring. "
+                       "Hostname: \"%s\" vs. \"%s\", "
+                       "Username: \"%s\" vs. \"%s\", "
+                       "Remote Path: \"%s\" vs. \"%s\"",
+                       tracking_entities_state_filename_.c_str(),
+                       state_hostname, hostname,
+                       state_username, username,
+                       state_remote_path, remote_path);
+    return false;
+  }
+
+  if (state_json_state_file.empty()) {
+    logger_->log_error("Could not found json state file path in Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
+    return false;
+  }
+
+  std::ifstream json_file(state_json_state_file);
+  if (!json_file.is_open()) {
+    logger_->log_error("Failed to open entities Tracking Entities state json file \"%s\"", state_json_state_file.c_str());
+    return false;
+  }
+
+  try {
+    rapidjson::IStreamWrapper isw(json_file);
+    rapidjson::Document d;
+    rapidjson::ParseResult res = d.ParseStream(isw);
+    if (!res) {
+      logger_->log_error("Failed to parse Tracking Entities state json file \"%s\"", state_json_state_file.c_str());
+      return false;
+    }
+    if (!d.IsObject()) {
+      logger_->log_error("Tracking Entities state json file \"%s\" root is not an object", state_json_state_file.c_str());
+      return false;
+    }
+
+    std::unordered_map<std::string, ListedEntity> new_already_listed_entities;
+    for (const auto &already_listed_entity : d.GetObject()) {
+      auto it = already_listed_entity.value.FindMember("timestamp");
+      if (it == already_listed_entity.value.MemberEnd() || !it->value.IsUint64()) {
+        logger_->log_error("Tracking Entities state json file \"%s\" timestamp missing or malformatted for entity \"%s\"",
+            state_json_state_file.c_str(),
+            already_listed_entity.name.GetString());
+        continue;
+      }
+      uint64_t timestamp = it->value.GetUint64();
+      it = already_listed_entity.value.FindMember("size");
+      if (it == already_listed_entity.value.MemberEnd() || !it->value.IsUint64()) {
+        logger_->log_error("Tracking Entities state json file \"%s\" size missing or malformatted for entity \"%s\"",
+                           state_json_state_file.c_str(),
+                           already_listed_entity.name.GetString());
+        continue;
+      }
+      uint64_t size = it->value.GetUint64();
+      new_already_listed_entities.emplace(std::piecewise_construct,
+                                          std::forward_as_tuple(already_listed_entity.name.GetString()),
+                                          std::forward_as_tuple(timestamp, size));
+    }
+    already_listed_entities_ = std::move(new_already_listed_entities);
+  } catch (std::exception& e) {
+    logger_->log_error("Exception while parsing Tracking Entities state json file \"%s\": %s", state_json_state_file.c_str(), e.what());
+    return false;
+  }
+
+  return true;
+}
+
+void ListSFTP::listByTrackingEntities(
+    const std::shared_ptr<core::ProcessContext>& context,
+    const std::shared_ptr<core::ProcessSession>& session,
+    const std::string& hostname,
+    uint16_t port,
+    const std::string& username,
+    const std::string& remote_path,
+    uint64_t entity_tracking_time_window,
+    std::vector<Child>&& files) {
+  /* Load state from cache file if needed */
+  if (!already_loaded_from_cache_ && !tracking_entities_state_filename_.empty()) {
+    if (updateFromTrackingEntitiesCache(hostname, username, remote_path)) {
+      logger_->log_debug("Successfully loaded Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
+      initial_listing_complete_ = true;
+    } else {
+      logger_->log_debug("Failed to load Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
+    }
+    already_loaded_from_cache_ = true;
+  }
+
+  time_t now = time(nullptr);
+  uint64_t min_timestamp_to_list = (!initial_listing_complete_ && entity_tracking_initial_listing_target_ == ENTITY_TRACKING_INITIAL_LISTING_TARGET_ALL_AVAILABLE)
+      ? 0U : (now * 1000 - entity_tracking_time_window);
+
+  /* Skip files not in the tracking window */
+  for (auto it = files.begin(); it != files.end(); ) {
+    if (it->attrs.mtime * 1000 < min_timestamp_to_list) {
+      logger_->log_trace("Skipping \"%s\" because it has an older timestamp than the minimum timestamp to list: %lu < %lu",
+          it->getPath(), it->attrs.mtime * 1000, min_timestamp_to_list);
+      it = files.erase(it);
+    } else {
+      ++it;
+    }
+  }
+
+  if (files.empty()) {
+    logger_->log_debug("No entities to list within the tracking time window");
+    context->yield();
+    return;
+  }
+
+  /* Find files that have been updated */
+  std::vector<Child> updated_entities;
+  std::copy_if(std::make_move_iterator(files.begin()),
+               std::make_move_iterator(files.end()),
+               std::back_inserter(updated_entities),
+               [&](const Child& child) {
+     auto already_listed_it = already_listed_entities_.find(child.getPath());
+     if (already_listed_it == already_listed_entities_.end()) {
+       logger_->log_trace("Found new file \"%s\"", child.getPath());
+       return true;
+     }
+
+     if (child.attrs.mtime * 1000 > already_listed_it->second.timestamp) {
+       logger_->log_trace("Found file \"%s\" with newer timestamp: %lu -> %lu",
+           child.getPath(),
+           already_listed_it->second.timestamp,
+           child.attrs.mtime * 1000);
+       return true;
+     }
+
+     if (child.attrs.filesize != already_listed_it->second.size) {
+       logger_->log_trace("Found file \"%s\" with different size: %lu -> %lu",
+                          child.getPath(),
+                          already_listed_it->second.size,
+                          child.attrs.filesize);
+       return true;
+     }
+
+     logger_->log_trace("Skipping file \"%s\" because it has not changed", child.getPath());
+     return false;
+  });
+
+  /* Find entities in the tracking cache that are no longer in the tracking window */
+  std::vector<std::string> old_entity_ids;
+  for (const auto& already_listed_entity : already_listed_entities_) {
+    if (already_listed_entity.second.timestamp < min_timestamp_to_list) {
+      old_entity_ids.emplace_back(already_listed_entity.first);
+    }
+  }
+
+  /* If we have no new files and no expired tracked entities, we have nothing to do */
+  if (updated_entities.empty() && old_entity_ids.empty()) {
+    context->yield();
+    return;
+  }
+
+  /* Remove expired entities */
+  for (const auto& old_entity_id : old_entity_ids) {
+    already_listed_entities_.erase(old_entity_id);
+  }
+
+  for (const auto& updated_entity : updated_entities) {
+    /* Create the FlowFile for this path */
+    if (!createAndTransferFlowFileFromChild(session, hostname, port, username, updated_entity)) {
+      logger_->log_error("Failed to emit FlowFile for \"%s\"", updated_entity.getPath());
+      context->yield();
+      return;
+    }
+    already_listed_entities_[updated_entity.getPath()] = ListedEntity(updated_entity.attrs.mtime * 1000, updated_entity.attrs.filesize);
+  }
+
+  initial_listing_complete_ = true;
+
+  if (!tracking_entities_state_filename_.empty()) {
+    persistTrackingEntitiesCache(hostname, username, remote_path);
+  }
+}
+
+void ListSFTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+  /* Parse common properties */
+  SFTPProcessorBase::CommonProperties common_properties;
+  if (!parseCommonPropertiesOnTrigger(context, nullptr /*flow_file*/, common_properties)) {
+    context->yield();
+    return;
+  }
+
+  /* Parse processor-specific properties */
+  std::string remote_path;
+  uint64_t entity_tracking_time_window = 0U;
+
+  std::string value;
+  context->getProperty(RemotePath.getName(), remote_path);
+  /* Remove trailing slashes */
+  while (remote_path.size() > 1U && remote_path.back() == '/') {
+    remote_path.resize(remote_path.size() - 1);
+  }
+  if (context->getProperty(EntityTrackingTimeWindow.getName(), value)) {
+    core::TimeUnit unit;
+    if (!core::Property::StringToTime(value, entity_tracking_time_window, unit) ||
+        !core::Property::ConvertTimeUnitToMS(entity_tracking_time_window, unit, entity_tracking_time_window)) {
+      /* The default is 3 hours	*/
+      entity_tracking_time_window = 3 * 3600 * 1000;
+      logger_->log_error("Entity Tracking Time Window attribute is invalid");
+    }
+  } else {
+    /* The default is 3 hours	*/
+    entity_tracking_time_window = 3 * 3600 * 1000;
+  }
+
+  /* Check whether we need to invalidate the cache based on the new properties */
+  if ((!last_hostname_.empty() && last_hostname_ != common_properties.hostname) ||
+      (!last_username_.empty() && last_username_ != common_properties.username) ||
+      (!last_remote_path_.empty() && last_remote_path_ != remote_path)) {
+    invalidateCache();
+  }
+  last_hostname_ = common_properties.hostname;
+  last_username_ = common_properties.username;
+  last_remote_path_ = remote_path;
+
+  /* Get SFTPClient from cache or create it */
+  const SFTPProcessorBase::ConnectionCacheKey connection_cache_key = {common_properties.hostname,
+                                                                      common_properties.port,
+                                                                      common_properties.username,
+                                                                      proxy_type_,
+                                                                      common_properties.proxy_host,
+                                                                      common_properties.proxy_port,
+                                                                      common_properties.proxy_username};
+  auto client = getOrCreateConnection(connection_cache_key,
+                                      common_properties.password,
+                                      common_properties.private_key_path,
+                                      common_properties.private_key_passphrase,
+                                      common_properties.proxy_password);
+  if (client == nullptr) {
+    context->yield();
+    return;
+  }
+
+  /*
+   * Unless we're sure that the connection is good, we don't want to put it back to the cache.
+   * So we will only call this when we're sure that the connection is OK.
+   */
+  auto put_connection_back_to_cache = [this, &connection_cache_key, &client]() {
+    addConnectionToCache(connection_cache_key, std::move(client));
+  };
+
+  std::deque<Child> directories;
+  std::vector<Child> files;
+
+  /* Add initial directory */
+  Child root;
+  std::tie(root.parent_path, root.filename) = utils::file::FileUtils::split_path(remote_path, true /*force_posix*/);
+  root.directory = true;
+  directories.emplace_back(std::move(root));
+
+  /* Process directories */
+  while (!directories.empty()) {
+    auto directory = std::move(directories.front());
+    directories.pop_front();
+
+    std::string new_parent_path;
+    if (directory.parent_path.empty()) {
+      new_parent_path = directory.filename;
+    } else {
+      new_parent_path = directory.getPath();
+    }
+    std::vector<std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>> dir_children;
+    if (!client->listDirectory(new_parent_path, follow_symlink_, dir_children)) {
+      continue;
+    }
+    for (auto&& dir_child : dir_children) {
+      if (filter(new_parent_path, dir_child)) {
+        Child child(new_parent_path, std::move(dir_child));
+        if (child.directory) {
+          directories.emplace_back(std::move(child));
+        } else {
+          files.emplace_back(std::move(child));
+        }
+      }
+    }
+  }
+
+  /* Process the files with the appropriate tracking strategy */
+  if (listing_strategy_ == LISTING_STRATEGY_TRACKING_TIMESTAMPS) {
+    listByTrackingTimestamps(context, session, common_properties.hostname, common_properties.port, common_properties.username, remote_path, std::move(files));
+  } else if (listing_strategy_ == LISTING_STRATEGY_TRACKING_ENTITIES) {
+    listByTrackingEntities(context, session, common_properties.hostname, common_properties.port, common_properties.username, remote_path, entity_tracking_time_window, std::move(files));
+  } else {
+    logger_->log_error("Unknown Listing Strategy: \"%s\"", listing_strategy_.c_str());
+    context->yield();
+    return;
+  }
+
+  put_connection_back_to_cache();
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sftp/processors/ListSFTP.h b/extensions/sftp/processors/ListSFTP.h
new file mode 100644
index 0000000..4fe32e2
--- /dev/null
+++ b/extensions/sftp/processors/ListSFTP.h
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __LIST_SFTP_H__
+#define __LIST_SFTP_H__
+
+#include <memory>
+#include <string>
+#include <map>
+#include <chrono>
+#include <cstdint>
+#ifndef WIN32
+#include <regex.h>
+#else
+#include <regex>
+#endif
+
+#include "SFTPProcessorBase.h"
+#include "utils/ByteArrayCallback.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+#include "../client/SFTPClient.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class ListSFTP : public SFTPProcessorBase {
+ public:
+
+  static constexpr char const *LISTING_STRATEGY_TRACKING_TIMESTAMPS = "Tracking Timestamps";
+  static constexpr char const *LISTING_STRATEGY_TRACKING_ENTITIES = "Tracking Entities";
+
+  static constexpr char const *TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT = "Auto Detect";
+  static constexpr char const *TARGET_SYSTEM_TIMESTAMP_PRECISION_MILLISECONDS = "Milliseconds";
+  static constexpr char const *TARGET_SYSTEM_TIMESTAMP_PRECISION_SECONDS = "Seconds";
+  static constexpr char const *TARGET_SYSTEM_TIMESTAMP_PRECISION_MINUTES = "Minutes";
+
+  static constexpr char const *ENTITY_TRACKING_INITIAL_LISTING_TARGET_TRACKING_TIME_WINDOW = "Tracking Time Window";
+  static constexpr char const *ENTITY_TRACKING_INITIAL_LISTING_TARGET_ALL_AVAILABLE = "All Available";
+
+  static constexpr char const* ProcessorName = "ListSFTP";
+
+
+  /*!
+   * Create a new processor
+   */
+  ListSFTP(std::string name, utils::Identifier uuid = utils::Identifier());
+  virtual ~ListSFTP();
+
+  // Supported Properties
+  static core::Property ListingStrategy;
+  static core::Property RemotePath;
+  static core::Property SearchRecursively;
+  static core::Property FollowSymlink;
+  static core::Property FileFilterRegex;
+  static core::Property PathFilterRegex;
+  static core::Property IgnoreDottedFiles;
+  static core::Property TargetSystemTimestampPrecision;
+  static core::Property EntityTrackingTimeWindow;
+  static core::Property EntityTrackingInitialListingTarget;
+  static core::Property MinimumFileAge;
+  static core::Property MaximumFileAge;
+  static core::Property MinimumFileSize;
+  static core::Property MaximumFileSize;
+  static core::Property StateFile;
+
+  // Supported Relationships
+  static core::Relationship Success;
+
+  // Writes Attributes
+  static constexpr char const* ATTRIBUTE_SFTP_REMOTE_HOST = "sftp.remote.host";
+  static constexpr char const* ATTRIBUTE_SFTP_REMOTE_PORT = "sftp.remote.port";
+  static constexpr char const* ATTRIBUTE_SFTP_LISTING_USER = "sftp.listing.user";
+  static constexpr char const* ATTRIBUTE_FILE_OWNER = "file.owner";
+  static constexpr char const* ATTRIBUTE_FILE_GROUP = "file.group";
+  static constexpr char const* ATTRIBUTE_FILE_PERMISSIONS = "file.permissions";
+  static constexpr char const* ATTRIBUTE_FILE_SIZE = "file.size";
+  static constexpr char const* ATTRIBUTE_FILE_LASTMODIFIEDTIME = "file.lastModifiedTime";
+
+  static const std::map<std::string, uint64_t> LISTING_LAG_MAP;
+
+  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+  virtual void initialize() override;
+  virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+
+ private:
+
+  std::string listing_strategy_;
+  bool search_recursively_;
+  bool follow_symlink_;
+  std::string file_filter_regex_;
+  std::string path_filter_regex_;
+  bool file_filter_regex_set_;
+  bool path_filter_regex_set_;
+#ifndef WIN32
+  regex_t compiled_file_filter_regex_;
+  regex_t compiled_path_filter_regex_;
+#else
+  std::regex compiled_file_filter_regex_;
+  std::regex compiled_path_filter_regex_;
+#endif
+  bool ignore_dotted_files_;
+  std::string target_system_timestamp_precision_;
+  std::string entity_tracking_initial_listing_target_;
+  uint64_t minimum_file_age_;
+  uint64_t maximum_file_age_;
+  uint64_t minimum_file_size_;
+  uint64_t maximum_file_size_;
+
+  std::string last_listing_strategy_;
+  std::string last_hostname_;
+  std::string last_username_;
+  std::string last_remote_path_;
+
+  struct Child {
+    Child();
+    Child(const std::string& parent_path_, std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>&& sftp_child);
+    std::string getPath() const;
+
+    bool directory;
+    std::string parent_path;
+    std::string filename;
+    LIBSSH2_SFTP_ATTRIBUTES attrs;
+  };
+
+  bool already_loaded_from_cache_;
+
+  std::string tracking_timestamps_state_filename_;
+  std::chrono::time_point<std::chrono::steady_clock> last_run_time_;
+  uint64_t last_listed_latest_entry_timestamp_;
+  uint64_t last_processed_latest_entry_timestamp_;
+  std::set<std::string> latest_identifiers_processed_;
+
+  bool initial_listing_complete_;
+  std::string tracking_entities_state_filename_;
+  std::string tracking_entities_state_json_filename_;
+  struct ListedEntity {
+    uint64_t timestamp;
+    uint64_t size;
+
+    ListedEntity();
+    ListedEntity(uint64_t timestamp, uint64_t size);
+  };
+  std::unordered_map<std::string, ListedEntity> already_listed_entities_;
+
+  void invalidateCache();
+
+  bool filter(const std::string& parent_path, const std::tuple<std::string /* filename */, std::string /* longentry */, LIBSSH2_SFTP_ATTRIBUTES /* attrs */>& sftp_child);
+  bool filterFile(const std::string& parent_path, const std::string& filename, const LIBSSH2_SFTP_ATTRIBUTES& attrs);
+  bool filterDirectory(const std::string& parent_path, const std::string& filename, const LIBSSH2_SFTP_ATTRIBUTES& attrs);
+
+  bool createAndTransferFlowFileFromChild(
+      const std::shared_ptr<core::ProcessSession>& session,
+      const std::string& hostname,
+      uint16_t port,
+      const std::string& username,
+      const Child& child);
+
+  bool persistTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path);
+  bool updateFromTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path);
+
+  bool persistTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path);
+  bool updateFromTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path);
+
+  void listByTrackingTimestamps(
+      const std::shared_ptr<core::ProcessContext>& context,
+      const std::shared_ptr<core::ProcessSession>& session,
+      const std::string& hostname,
+      uint16_t port,
+      const std::string& username,
+      const std::string& remote_path,
+      std::vector<Child>&& files);
+
+  void listByTrackingEntities(
+      const std::shared_ptr<core::ProcessContext>& context,
+      const std::shared_ptr<core::ProcessSession>& session,
+      const std::string& hostname,
+      uint16_t port,
+      const std::string& username,
+      const std::string& remote_path,
+      uint64_t entity_tracking_time_window,
+      std::vector<Child>&& files);
+};
+
+REGISTER_RESOURCE(ListSFTP, "Performs a listing of the files residing on an SFTP server. "
+                            "For each file that is found on the remote server, a new FlowFile will be created with "
+                            "the filename attribute set to the name of the file on the remote server. "
+                            "This can then be used in conjunction with FetchSFTP in order to fetch those files.")
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
diff --git a/extensions/sftp/processors/PutSFTP.cpp b/extensions/sftp/processors/PutSFTP.cpp
index e1b589d..3262feb 100644
--- a/extensions/sftp/processors/PutSFTP.cpp
+++ b/extensions/sftp/processors/PutSFTP.cpp
@@ -42,6 +42,7 @@
 #include "ResourceClaim.h"
 #include "utils/StringUtils.h"
 #include "utils/ScopeGuard.h"
+#include "utils/file/FileUtils.h"
 
 namespace org {
 namespace apache {
@@ -49,30 +50,12 @@
 namespace minifi {
 namespace processors {
 
-core::Property PutSFTP::Hostname(
-    core::PropertyBuilder::createProperty("Hostname")->withDescription("The fully qualified hostname or IP address of the remote system")
-        ->supportsExpressionLanguage(true)->build());
-core::Property PutSFTP::Port(
-    core::PropertyBuilder::createProperty("Port")->withDescription("The port that the remote system is listening on for file transfers")
-        ->supportsExpressionLanguage(true)->build());
-core::Property PutSFTP::Username(
-    core::PropertyBuilder::createProperty("Username")->withDescription("Username")
-        ->supportsExpressionLanguage(true)->build());
-core::Property PutSFTP::Password(
-    core::PropertyBuilder::createProperty("Password")->withDescription("Password for the user account")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
-core::Property PutSFTP::PrivateKeyPath(
-    core::PropertyBuilder::createProperty("Private Key Path")->withDescription("The fully qualified path to the Private Key file")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
-core::Property PutSFTP::PrivateKeyPassphrase(
-    core::PropertyBuilder::createProperty("Private Key Passphrase")->withDescription("Password for the private key")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
 core::Property PutSFTP::RemotePath(
     core::PropertyBuilder::createProperty("Remote Path")->withDescription("The path on the remote system from which to pull or push files")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
 core::Property PutSFTP::CreateDirectory(
     core::PropertyBuilder::createProperty("Create Directory")->withDescription("Specifies whether or not the remote directory should be created if it does not exist.")
-        ->withDefaultValue<bool>(false)->build());
+        ->isRequired(true)->withDefaultValue<bool>(false)->build());
 core::Property PutSFTP::DisableDirectoryListing(
     core::PropertyBuilder::createProperty("Disable Directory Listing")->withDescription("If set to 'true', directory listing is not performed prior to create missing directories. "
                                                                                         "By default, this processor executes a directory listing command to see target directory existence before creating missing directories. "
@@ -80,18 +63,13 @@
                                                                                         "Directory listing might fail with some permission setups (e.g. chmod 100) on a directory. "
                                                                                         "Also, if any other SFTP client created the directory after this processor performed a listing and before a directory creation request by this processor is finished, "
                                                                                         "then an error is returned because the directory already exists.")
-                                                                                        ->isRequired(false)->withDefaultValue<bool>(false)->build());
+        ->isRequired(false)->withDefaultValue<bool>(false)->build());
 core::Property PutSFTP::BatchSize(
     core::PropertyBuilder::createProperty("Batch Size")->withDescription("The maximum number of FlowFiles to send in a single connection")
-        ->withDefaultValue<uint64_t>(500)->build());
-core::Property PutSFTP::ConnectionTimeout(
-    core::PropertyBuilder::createProperty("Connection Timeout")->withDescription("Amount of time to wait before timing out while creating a connection")
-        ->withDefaultValue<core::TimePeriodValue>("30 sec")->build());
-core::Property PutSFTP::DataTimeout(
-    core::PropertyBuilder::createProperty("Data Timeout")->withDescription("When transferring a file between the local and remote system, this value specifies how long is allowed to elapse without any data being transferred between systems")
-        ->withDefaultValue<core::TimePeriodValue>("30 sec")->build());
+        ->isRequired(true)->withDefaultValue<uint64_t>(500)->build());
 core::Property PutSFTP::ConflictResolution(
     core::PropertyBuilder::createProperty("Conflict Resolution")->withDescription("Determines how to handle the problem of filename collisions")
+        ->isRequired(true)
         ->withAllowableValues<std::string>({CONFLICT_RESOLUTION_REPLACE,
                                             CONFLICT_RESOLUTION_IGNORE,
                                             CONFLICT_RESOLUTION_RENAME,
@@ -109,63 +87,34 @@
 core::Property PutSFTP::TempFilename(
     core::PropertyBuilder::createProperty("Temporary Filename")->withDescription("If set, the filename of the sent file will be equal to the value specified during the transfer and after successful completion will be renamed to the original filename. "
                                                                                  "If this value is set, the Dot Rename property is ignored.")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
-core::Property PutSFTP::HostKeyFile(
-    core::PropertyBuilder::createProperty("Host Key File")->withDescription("If supplied, the given file will be used as the Host Key; otherwise, no use host key file will be used")
-        ->isRequired(false)->build());
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
 core::Property PutSFTP::LastModifiedTime(
     core::PropertyBuilder::createProperty("Last Modified Time")->withDescription("The lastModifiedTime to assign to the file after transferring it. "
                                                                                   "If not set, the lastModifiedTime will not be changed. "
                                                                                   "Format must be yyyy-MM-dd'T'HH:mm:ssZ. "
                                                                                   "You may also use expression language such as ${file.lastModifiedTime}. "
                                                                                   "If the value is invalid, the processor will not be invalid but will fail to change lastModifiedTime of the file.")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
 core::Property PutSFTP::Permissions(
     core::PropertyBuilder::createProperty("Permissions")->withDescription("The permissions to assign to the file after transferring it. "
                                                                           "Format must be either UNIX rwxrwxrwx with a - in place of denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644). "
                                                                           "If not set, the permissions will not be changed. "
                                                                           "You may also use expression language such as ${file.permissions}. "
                                                                           "If the value is invalid, the processor will not be invalid but will fail to change permissions of the file.")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
 core::Property PutSFTP::RemoteOwner(
     core::PropertyBuilder::createProperty("Remote Owner")->withDescription("Integer value representing the User ID to set on the file after transferring it. "
                                                                            "If not set, the owner will not be set. You may also use expression language such as ${file.owner}. "
                                                                            "If the value is invalid, the processor will not be invalid but will fail to change the owner of the file.")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
 core::Property PutSFTP::RemoteGroup(
     core::PropertyBuilder::createProperty("Remote Group")->withDescription("Integer value representing the Group ID to set on the file after transferring it. "
                                                                            "If not set, the group will not be set. You may also use expression language such as ${file.group}. "
                                                                            "If the value is invalid, the processor will not be invalid but will fail to change the group of the file.")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
-core::Property PutSFTP::StrictHostKeyChecking(
-    core::PropertyBuilder::createProperty("Strict Host Key Checking")->withDescription("Indicates whether or not strict enforcement of hosts keys should be applied")
-        ->withDefaultValue<bool>(false)->build());
-core::Property PutSFTP::UseKeepaliveOnTimeout(
-    core::PropertyBuilder::createProperty("Send Keep Alive On Timeout")->withDescription("Indicates whether or not to send a single Keep Alive message when SSH socket times out")
-        ->withDefaultValue<bool>(true)->build());
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
 core::Property PutSFTP::UseCompression(
     core::PropertyBuilder::createProperty("Use Compression")->withDescription("Indicates whether or not ZLIB compression should be used when transferring files")
-        ->withDefaultValue<bool>(false)->build());
-core::Property PutSFTP::ProxyType(
-    core::PropertyBuilder::createProperty("Proxy Type")->withDescription("Specifies the Proxy Configuration Controller Service to proxy network requests. If set, it supersedes proxy settings configured per component. "
-                                                                         "Supported proxies: HTTP + AuthN, SOCKS + AuthN")
-        ->isRequired(false)
-        ->withAllowableValues<std::string>({PROXY_TYPE_DIRECT,
-                                            PROXY_TYPE_HTTP,
-                                            PROXY_TYPE_SOCKS})
-        ->withDefaultValue(PROXY_TYPE_DIRECT)->build());
-core::Property PutSFTP::ProxyHost(
-    core::PropertyBuilder::createProperty("Proxy Host")->withDescription("The fully qualified hostname or IP address of the proxy server")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
-core::Property PutSFTP::ProxyPort(
-    core::PropertyBuilder::createProperty("Proxy Port")->withDescription("The port of the proxy server")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
-core::Property PutSFTP::HttpProxyUsername(
-    core::PropertyBuilder::createProperty("Http Proxy Username")->withDescription("Http Proxy Username")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
-core::Property PutSFTP::HttpProxyPassword(
-    core::PropertyBuilder::createProperty("Http Proxy Password")->withDescription("Http Proxy Password")
-        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+        ->isRequired(true)->withDefaultValue<bool>(false)->build());
 
 core::Relationship PutSFTP::Success("success", "FlowFiles that are successfully sent will be routed to success");
 core::Relationship PutSFTP::Reject("reject", "FlowFiles that were rejected by the destination system");
@@ -176,35 +125,20 @@
 
   // Set the supported properties
   std::set<core::Property> properties;
-  properties.insert(Hostname);
-  properties.insert(Port);
-  properties.insert(Username);
-  properties.insert(Password);
-  properties.insert(PrivateKeyPath);
-  properties.insert(PrivateKeyPassphrase);
+  addSupportedCommonProperties(properties);
   properties.insert(RemotePath);
   properties.insert(CreateDirectory);
   properties.insert(DisableDirectoryListing);
   properties.insert(BatchSize);
-  properties.insert(ConnectionTimeout);
-  properties.insert(DataTimeout);
   properties.insert(ConflictResolution);
   properties.insert(RejectZeroByte);
   properties.insert(DotRename);
   properties.insert(TempFilename);
-  properties.insert(HostKeyFile);
   properties.insert(LastModifiedTime);
   properties.insert(Permissions);
   properties.insert(RemoteOwner);
   properties.insert(RemoteGroup);
-  properties.insert(StrictHostKeyChecking);
-  properties.insert(UseKeepaliveOnTimeout);
   properties.insert(UseCompression);
-  properties.insert(ProxyType);
-  properties.insert(ProxyHost);
-  properties.insert(ProxyPort);
-  properties.insert(HttpProxyUsername);
-  properties.insert(HttpProxyPassword);
   setSupportedProperties(properties);
   
   // Set the supported relationships
@@ -215,143 +149,21 @@
   setSupportedRelationships(relationships);
 }
 
-constexpr size_t PutSFTP::CONNECTION_CACHE_MAX_SIZE;
-
 PutSFTP::PutSFTP(std::string name, utils::Identifier uuid /*= utils::Identifier()*/)
-  : Processor(name, uuid),
-    logger_(logging::LoggerFactory<PutSFTP>::getLogger()),
+  : SFTPProcessorBase(name, uuid),
     create_directory_(false),
     batch_size_(0),
-    connection_timeout_(0),
-    data_timeout_(0),
     reject_zero_byte_(false),
-    dot_rename_(false),
-    strict_host_checking_(false),
-    use_keepalive_on_timeout_(false),
-    use_compression_(false),
-    running_(true) {
+    dot_rename_(false) {
+  logger_ = logging::LoggerFactory<PutSFTP>::getLogger();
 }
 
 PutSFTP::~PutSFTP() {
-  if (keepalive_thread_.joinable()) {
-    {
-      std::lock_guard<std::mutex> lock(connections_mutex_);
-      running_ = false;
-      keepalive_cv_.notify_one();
-    }
-    keepalive_thread_.join();
-  }
-}
-
-bool PutSFTP::ConnectionCacheKey::operator<(const PutSFTP::ConnectionCacheKey& other) const {
-  return std::tie(hostname, port, username, proxy_type, proxy_host, proxy_port) <
-          std::tie(other.hostname, other.port, other.username, other.proxy_type, other.proxy_host, other.proxy_port);
-}
-
-bool PutSFTP::ConnectionCacheKey::operator==(const PutSFTP::ConnectionCacheKey& other) const {
-  return std::tie(hostname, port, username, proxy_type, proxy_host, proxy_port) ==
-         std::tie(other.hostname, other.port, other.username, other.proxy_type, other.proxy_host, other.proxy_port);
-}
-
-std::unique_ptr<utils::SFTPClient> PutSFTP::getConnectionFromCache(const PutSFTP::ConnectionCacheKey& key) {
-  std::lock_guard<std::mutex> lock(connections_mutex_);
-
-  auto it = connections_.find(key);
-  if (it == connections_.end()) {
-    return nullptr;
-  }
-
-  logger_->log_debug("Removing %s@%s:%hu from SFTP connection pool",
-      key.username,
-      key.hostname,
-      key.port);
-
-  auto lru_it = std::find(lru_.begin(), lru_.end(), key);
-  if (lru_it == lru_.end()) {
-    logger_->log_trace("Assertion error: can't find key in LRU cache");
-  } else {
-    lru_.erase(lru_it);
-  }
-
-  auto connection = std::move(it->second);
-  connections_.erase(it);
-  return connection;
-}
-
-void PutSFTP::addConnectionToCache(const PutSFTP::ConnectionCacheKey& key, std::unique_ptr<utils::SFTPClient>&& connection) {
-  std::lock_guard<std::mutex> lock(connections_mutex_);
-
-  while (connections_.size() >= PutSFTP::CONNECTION_CACHE_MAX_SIZE) {
-    const auto& lru_key = lru_.back();
-    logger_->log_debug("SFTP connection pool is full, removing %s@%s:%hu",
-        lru_key.username,
-        lru_key.hostname,
-        lru_key.port);
-    connections_.erase(lru_key);
-    lru_.pop_back();
-  }
-
-  logger_->log_debug("Adding %s@%s:%hu to SFTP connection pool",
-      key.username,
-      key.hostname,
-      key.port);
-  connections_.emplace(key, std::move(connection));
-  lru_.push_front(key);
-  keepalive_cv_.notify_one();
-}
-
-void PutSFTP::keepaliveThreadFunc() {
-  std::unique_lock<std::mutex> lock(connections_mutex_);
-
-  while (true) {
-    if (connections_.empty()) {
-      keepalive_cv_.wait(lock, [this] {
-        return !running_ || !connections_.empty();
-      });
-    }
-    if (!running_) {
-      logger_->log_trace("Stopping keepalive thread");
-      lock.unlock();
-      return;
-    }
-
-    int min_wait = 10;
-    for (auto &connection : connections_) {
-      int seconds_to_next = 0;
-      if (connection.second->sendKeepAliveIfNeeded(seconds_to_next)) {
-        logger_->log_debug("Sent keepalive to %s@%s:%hu if needed, next keepalive in %d s",
-           connection.first.username,
-           connection.first.hostname,
-           connection.first.port,
-           seconds_to_next);
-        if (seconds_to_next < min_wait) {
-          min_wait = seconds_to_next;
-        }
-      } else {
-        logger_->log_debug("Failed to send keepalive to %s@%s:%hu",
-                           connection.first.username,
-                           connection.first.hostname,
-                           connection.first.port);
-      }
-    }
-
-    /* Avoid busy loops */
-    if (min_wait < 1) {
-      min_wait = 1;
-    }
-
-    logger_->log_trace("Keepalive thread is going to sleep for %d s", min_wait);
-    keepalive_cv_.wait_for(lock, std::chrono::seconds(min_wait), [this] {
-      return !running_;
-    });
-    if (!running_) {
-      lock.unlock();
-      return;
-    }
-  }
 }
 
 void PutSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  parseCommonPropertiesOnSchedule(context);
+
   std::string value;
   if (!context->getProperty(CreateDirectory.getName(), value)) {
     logger_->log_error("Create Directory attribute is missing or invalid");
@@ -363,22 +175,6 @@
   } else {
     core::Property::StringToInt(value, batch_size_);
   }
-  if (!context->getProperty(ConnectionTimeout.getName(), value)) {
-    logger_->log_error("Connection Timeout attribute is missing or invalid");
-  } else {
-    core::TimeUnit unit;
-    if (!core::Property::StringToTime(value, connection_timeout_, unit) || !core::Property::ConvertTimeUnitToMS(connection_timeout_, unit, connection_timeout_)) {
-      logger_->log_error("Connection Timeout attribute is invalid");
-    }
-  }
-  if (!context->getProperty(DataTimeout.getName(), value)) {
-    logger_->log_error("Data Timeout attribute is missing or invalid");
-  } else {
-    core::TimeUnit unit;
-    if (!core::Property::StringToTime(value, data_timeout_, unit) || !core::Property::ConvertTimeUnitToMS(data_timeout_, unit, data_timeout_)) {
-      logger_->log_error("Data Timeout attribute is invalid");
-    }
-  }
   context->getProperty(ConflictResolution.getName(), conflict_resolution_);
   if (context->getProperty(RejectZeroByte.getName(), value)) {
     utils::StringUtils::StringToBool(value, reject_zero_byte_);
@@ -386,43 +182,13 @@
   if (context->getProperty(DotRename.getName(), value)) {
     utils::StringUtils::StringToBool(value, dot_rename_);
   }
-  context->getProperty(HostKeyFile.getName(), host_key_file_);
-  if (!context->getProperty(StrictHostKeyChecking.getName(), value)) {
-    logger_->log_error("Strict Host Key Checking attribute is missing or invalid");
-  } else {
-    utils::StringUtils::StringToBool(value, strict_host_checking_);
-  }
-  if (!context->getProperty(UseKeepaliveOnTimeout.getName(), value)) {
-    logger_->log_error("Send Keep Alive On Timeout attribute is missing or invalid");
-  } else {
-    utils::StringUtils::StringToBool(value, use_keepalive_on_timeout_);
-  }
   if (!context->getProperty(UseCompression.getName(), value)) {
     logger_->log_error("Use Compression attribute is missing or invalid");
   } else {
     utils::StringUtils::StringToBool(value, use_compression_);
   }
-  context->getProperty(ProxyType.getName(), proxy_type_);
 
-  if (use_keepalive_on_timeout_ && !keepalive_thread_.joinable()) {
-    running_ = true;
-    keepalive_thread_ = std::thread(&PutSFTP::keepaliveThreadFunc, this);
-  }
-}
-
-void PutSFTP::notifyStop() {
-  logger_->log_debug("Got notifyStop, stopping keepalive thread and clearing connections");
-  if (keepalive_thread_.joinable()) {
-    {
-      std::lock_guard<std::mutex> lock(connections_mutex_);
-      running_ = false;
-      keepalive_cv_.notify_one();
-    }
-    keepalive_thread_.join();
-  }
-  /* The thread is no longer running, we don't have to lock */
-  connections_.clear();
-  lru_.clear();
+  startKeepaliveThreadIfNeeded();
 }
 
 PutSFTP::ReadCallback::ReadCallback(const std::string& target_path,
@@ -443,30 +209,26 @@
       *stream,
       conflict_resolution_ == CONFLICT_RESOLUTION_REPLACE /*overwrite*/,
       stream->getSize() /*expected_size*/)) {
-    return -1;
+    throw client_.getLastError();
   }
-  write_succeeded_ = true;
   return stream->getSize();
 }
 
-bool PutSFTP::ReadCallback::commit() {
-  return write_succeeded_;
-}
-
 bool PutSFTP::processOne(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
   std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord>(session->get());
   if (flow_file == nullptr) {
     return false;
   }
 
-  /* Parse possibly FlowFile-dependent properties */
+  /* Parse common properties */
+  SFTPProcessorBase::CommonProperties common_properties;
+  if (!parseCommonPropertiesOnTrigger(context, flow_file, common_properties)) {
+    context->yield();
+    return false;
+  }
+
+  /* Parse processor-specific properties */
   std::string filename;
-  std::string hostname;
-  uint16_t port = 0U;
-  std::string username;
-  std::string password;
-  std::string private_key_path;
-  std::string private_key_passphrase;
   std::string remote_path;
   bool disable_directory_listing = false;
   std::string temp_file_name;
@@ -478,50 +240,11 @@
   uint64_t remote_owner = 0U;
   bool remote_group_set = false;
   uint64_t remote_group = 0U;
-  std::string proxy_host;
-  uint16_t proxy_port = 0U;
-  std::string proxy_username;
-  std::string proxy_password;
 
   flow_file->getKeyedAttribute(FILENAME, filename);
 
   std::string value;
-  if (!context->getProperty(Hostname, hostname, flow_file)) {
-    logger_->log_error("Hostname attribute is missing");
-    context->yield();
-    return false;
-  }
-  if (!context->getProperty(Port, value, flow_file)) {
-    logger_->log_error("Port attribute is missing or invalid");
-    context->yield();
-    return false;
-  } else {
-    int port_tmp;
-    if (!core::Property::StringToInt(value, port_tmp) ||
-        port_tmp < std::numeric_limits<uint16_t>::min() ||
-        port_tmp > std::numeric_limits<uint16_t>::max()) {
-      logger_->log_error("Port attribute \"%s\" is invalid", value);
-      context->yield();
-      return false;
-    } else {
-      port = static_cast<uint16_t>(port_tmp);
-    }
-  }
-  if (!context->getProperty(Username, username, flow_file)) {
-    logger_->log_error("Username attribute is missing");
-    context->yield();
-    return false;
-  }
-  context->getProperty(Password, password, flow_file);
-  context->getProperty(PrivateKeyPath, private_key_path, flow_file);
-  context->getProperty(PrivateKeyPassphrase, private_key_passphrase, flow_file);
-  context->getProperty(Password, password, flow_file);
   context->getProperty(RemotePath, remote_path, flow_file);
-  if (context->getDynamicProperty(DisableDirectoryListing.getName(), value)) {
-    utils::StringUtils::StringToBool(value, disable_directory_listing);
-  } else if (context->getProperty(DisableDirectoryListing.getName(), value)) {
-    utils::StringUtils::StringToBool(value, disable_directory_listing);
-  }
   /* Remove trailing slashes */
   while (remote_path.size() > 1U && remote_path.back() == '/') {
     remote_path.resize(remote_path.size() - 1);
@@ -530,6 +253,11 @@
   if (remote_path.empty()) {
     remote_path = ".";
   }
+  if (context->getDynamicProperty(DisableDirectoryListing.getName(), value)) {
+    utils::StringUtils::StringToBool(value, disable_directory_listing);
+  } else if (context->getProperty(DisableDirectoryListing.getName(), value)) {
+    utils::StringUtils::StringToBool(value, disable_directory_listing);
+  }
   context->getProperty(TempFilename, temp_file_name, flow_file);
   if (context->getProperty(LastModifiedTime, value, flow_file)) {
     if (core::Property::StringToDateTime(value, last_modified_time)) {
@@ -551,21 +279,6 @@
       remote_group_set = true;
     }
   }
-  context->getProperty(ProxyHost, proxy_host, flow_file);
-  if (context->getProperty(ProxyPort, value, flow_file) && !value.empty()) {
-    int port_tmp;
-    if (!core::Property::StringToInt(value, port_tmp) ||
-        port_tmp < std::numeric_limits<uint16_t>::min() ||
-        port_tmp > std::numeric_limits<uint16_t>::max()) {
-      logger_->log_error("Proxy Port attribute \"%s\" is invalid", value);
-      context->yield();
-      return false;
-    } else {
-      proxy_port = static_cast<uint16_t>(port_tmp);
-    }
-  }
-  context->getProperty(HttpProxyUsername, proxy_username, flow_file);
-  context->getProperty(HttpProxyPassword, proxy_password, flow_file);
 
   /* Reject zero-byte files if needed */
   if (reject_zero_byte_ && flow_file->getSize() == 0U) {
@@ -575,56 +288,21 @@
   }
 
   /* Get SFTPClient from cache or create it */
-  const PutSFTP::ConnectionCacheKey connection_cache_key = {hostname, port, username, proxy_type_, proxy_host, proxy_port};
-  auto client = getConnectionFromCache(connection_cache_key);
+  const SFTPProcessorBase::ConnectionCacheKey connection_cache_key = {common_properties.hostname,
+                                                                      common_properties.port,
+                                                                      common_properties.username,
+                                                                      proxy_type_,
+                                                                      common_properties.proxy_host,
+                                                                      common_properties.proxy_port,
+                                                                      common_properties.proxy_username};
+  auto client = getOrCreateConnection(connection_cache_key,
+                                      common_properties.password,
+                                      common_properties.private_key_path,
+                                      common_properties.private_key_passphrase,
+                                      common_properties.proxy_password);
   if (client == nullptr) {
-    client = std::unique_ptr<utils::SFTPClient>(new utils::SFTPClient(hostname, port, username));
-    if (!IsNullOrEmpty(host_key_file_)) {
-      if (!client->setHostKeyFile(host_key_file_, strict_host_checking_)) {
-        logger_->log_error("Cannot set host key file");
-        context->yield();
-        return false;
-      }
-    }
-    if (!IsNullOrEmpty(password)) {
-      client->setPasswordAuthenticationCredentials(password);
-    }
-    if (!IsNullOrEmpty(private_key_path)) {
-      client->setPublicKeyAuthenticationCredentials(private_key_path, private_key_passphrase);
-    }
-    if (proxy_type_ != PROXY_TYPE_DIRECT) {
-      utils::HTTPProxy proxy;
-      proxy.host = proxy_host;
-      proxy.port = proxy_port;
-      proxy.username = proxy_username;
-      proxy.password = proxy_password;
-      if (!client->setProxy(
-          proxy_type_ == PROXY_TYPE_HTTP ? utils::SFTPClient::ProxyType::Http : utils::SFTPClient::ProxyType::Socks,
-          proxy)) {
-        logger_->log_error("Cannot set proxy");
-        context->yield();
-        return false;
-      }
-    }
-    if (!client->setConnectionTimeout(connection_timeout_)) {
-      logger_->log_error("Cannot set connection timeout");
-      context->yield();
-      return false;
-    }
-    client->setDataTimeout(data_timeout_);
-    client->setSendKeepAlive(use_keepalive_on_timeout_);
-    if (!client->setUseCompression(use_compression_)) {
-      logger_->log_error("Cannot set compression");
-      context->yield();
-      return false;
-    }
-
-    /* Connect to SFTP server */
-    if (!client->connect()) {
-      logger_->log_error("Cannot connect to SFTP server");
-      context->yield();
-      return false;
-    }
+    context->yield();
+    return false;
   }
 
   /*
@@ -638,13 +316,10 @@
   /* Try to detect conflicts if needed */
   std::string resolved_filename = filename;
   if (conflict_resolution_ != CONFLICT_RESOLUTION_NONE) {
-    std::stringstream target_path_ss;
-    target_path_ss << remote_path << "/" << filename;
-    auto target_path = target_path_ss.str();
+    std::string target_path = utils::file::FileUtils::concat_path(remote_path, filename, true /*force_posix*/);
     LIBSSH2_SFTP_ATTRIBUTES attrs;
-    bool file_not_exists;
-    if (!client->stat(target_path, true /*follow_symlinks*/, attrs, file_not_exists)) {
-      if (!file_not_exists) {
+    if (!client->stat(target_path, true /*follow_symlinks*/, attrs)) {
+      if (client->getLastError() != utils::SFTPError::SFTP_ERROR_FILE_NOT_EXISTS) {
         logger_->log_error("Failed to stat %s", target_path.c_str());
         session->transfer(flow_file, Failure);
         return true;
@@ -679,10 +354,9 @@
           std::stringstream possible_resolved_filename_ss;
           possible_resolved_filename_ss << i << "." << filename;
           possible_resolved_filename = possible_resolved_filename_ss.str();
-          auto possible_resolved_path = remote_path + "/" + possible_resolved_filename;
-          bool file_not_exists;
-          if (!client->stat(possible_resolved_path, true /*follow_symlinks*/, attrs, file_not_exists)) {
-            if (file_not_exists) {
+          std::string possible_resolved_path = utils::file::FileUtils::concat_path(remote_path, possible_resolved_filename, true /*force_posix*/);
+          if (!client->stat(possible_resolved_path, true /*follow_symlinks*/, attrs)) {
+            if (client->getLastError() == utils::SFTPError::SFTP_ERROR_FILE_NOT_EXISTS) {
               unique_name_generated = true;
               break;
             } else {
@@ -707,50 +381,26 @@
 
   /* Create remote directory if needed */
   if (create_directory_) {
-    bool should_create_directory = disable_directory_listing;
-    if (!disable_directory_listing) {
-      LIBSSH2_SFTP_ATTRIBUTES attrs;
-      bool file_not_exists;
-      if (!client->stat(remote_path, true /*follow_symlinks*/, attrs, file_not_exists)) {
-        if (!file_not_exists) {
-          logger_->log_error("Failed to stat %s", remote_path.c_str());
-        }
-        should_create_directory = true;
-      } else {
-        if (attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS && !LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
-          logger_->log_error("Remote path %s is not a directory", remote_path.c_str());
-          session->transfer(flow_file, Failure);
-          put_connection_back_to_cache();
-          return true;
-        }
-        logger_->log_debug("Found remote directory %s", remote_path.c_str());
-      }
-    }
-    if (should_create_directory) {
-      (void) client->createDirectoryHierarchy(remote_path);
-      if (!disable_directory_listing) {
-        LIBSSH2_SFTP_ATTRIBUTES attrs;
-        bool file_not_exists;
-        if (!client->stat(remote_path, true /*follow_symlinks*/, attrs, file_not_exists)) {
-          if (file_not_exists) {
-            logger_->log_error("Could not find remote directory %s after creating it", remote_path.c_str());
-            session->transfer(flow_file, Failure);
-            put_connection_back_to_cache();
-            return true;
-          } else {
-            logger_->log_error("Failed to stat %s", remote_path.c_str());
-            context->yield();
-            return false;
-          }
-        } else {
-          if ((attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS) && !LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
-            logger_->log_error("Remote path %s is not a directory", remote_path.c_str());
-            session->transfer(flow_file, Failure);
-            put_connection_back_to_cache();
-            return true;
-          }
-        }
-      }
+    auto res = createDirectoryHierarchy(*client, remote_path, disable_directory_listing);
+    switch (res) {
+      case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_OK:
+        break;
+      case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_STAT_FAILED:
+        context->yield();
+        return false;
+      case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_A_DIRECTORY:
+        session->transfer(flow_file, Failure);
+        put_connection_back_to_cache();
+        return true;
+      case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_FOUND:
+      case SFTPProcessorBase::CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_PERMISSION_DENIED:
+        session->transfer(flow_file, Failure);
+        put_connection_back_to_cache();
+        return true;
+      default:
+        logger_->log_error("Unknown createDirectoryHierarchy result: %hhu", static_cast<uint8_t>(res));
+        context->yield();
+        return false;
     }
   }
 
@@ -765,15 +415,13 @@
     target_path_ss << resolved_filename;
   }
   auto target_path = target_path_ss.str();
-  std::stringstream final_target_path_ss;
-  final_target_path_ss << remote_path << "/" << resolved_filename;
-  auto final_target_path = final_target_path_ss.str();
+  std::string final_target_path = utils::file::FileUtils::concat_path(remote_path, resolved_filename, true /*force_posix*/);
   logger_->log_debug("The target path is %s, final target path is %s", target_path.c_str(), final_target_path.c_str());
 
   ReadCallback read_callback(target_path.c_str(), *client, conflict_resolution_);
-  session->read(flow_file, &read_callback);
-
-  if (!read_callback.commit()) {
+  try {
+    session->read(flow_file, &read_callback);
+  } catch (const utils::SFTPError&) {
     session->transfer(flow_file, Failure);
     return true;
   }
diff --git a/extensions/sftp/processors/PutSFTP.h b/extensions/sftp/processors/PutSFTP.h
index d04fbf8..3d2a115 100644
--- a/extensions/sftp/processors/PutSFTP.h
+++ b/extensions/sftp/processors/PutSFTP.h
@@ -26,6 +26,7 @@
 #include <mutex>
 #include <thread>
 
+#include "SFTPProcessorBase.h"
 #include "utils/ByteArrayCallback.h"
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
@@ -44,7 +45,7 @@
 namespace minifi {
 namespace processors {
 
-class PutSFTP : public core::Processor {
+ class PutSFTP : public SFTPProcessorBase {
  public:
 
   static constexpr char const *CONFLICT_RESOLUTION_REPLACE = "REPLACE";
@@ -54,10 +55,6 @@
   static constexpr char const *CONFLICT_RESOLUTION_FAIL = "FAIL";
   static constexpr char const *CONFLICT_RESOLUTION_NONE = "NONE";
 
-  static constexpr char const *PROXY_TYPE_DIRECT = "DIRECT";
-  static constexpr char const *PROXY_TYPE_HTTP = "HTTP";
-  static constexpr char const *PROXY_TYPE_SOCKS = "SOCKS";
-
   static constexpr char const* ProcessorName = "PutSFTP";
 
 
@@ -68,35 +65,19 @@
   virtual ~PutSFTP();
 
   // Supported Properties
-  static core::Property Hostname;
-  static core::Property Port;
-  static core::Property Username;
-  static core::Property Password;
-  static core::Property PrivateKeyPath;
-  static core::Property PrivateKeyPassphrase;
   static core::Property RemotePath;
   static core::Property CreateDirectory;
   static core::Property DisableDirectoryListing;
   static core::Property BatchSize;
-  static core::Property ConnectionTimeout;
-  static core::Property DataTimeout;
   static core::Property ConflictResolution;
   static core::Property RejectZeroByte;
   static core::Property DotRename;
   static core::Property TempFilename;
-  static core::Property HostKeyFile;
   static core::Property LastModifiedTime;
   static core::Property Permissions;
   static core::Property RemoteOwner;
   static core::Property RemoteGroup;
-  static core::Property StrictHostKeyChecking;
-  static core::Property UseKeepaliveOnTimeout;
   static core::Property UseCompression;
-  static core::Property ProxyType;
-  static core::Property ProxyHost;
-  static core::Property ProxyPort;
-  static core::Property HttpProxyUsername;
-  static core::Property HttpProxyPassword;
 
   // Supported Relationships
   static core::Relationship Success;
@@ -110,7 +91,6 @@
   virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
   virtual void initialize() override;
   virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
-  virtual void notifyStop() override;
 
   class ReadCallback : public InputStreamCallback {
    public:
@@ -119,7 +99,6 @@
         const std::string& conflict_resolution);
     ~ReadCallback();
     virtual int64_t process(std::shared_ptr<io::BaseStream> stream) override;
-    bool commit();
 
    private:
     std::shared_ptr<logging::Logger> logger_;
@@ -131,43 +110,11 @@
 
  private:
 
-  std::shared_ptr<logging::Logger> logger_;
-
   bool create_directory_;
   uint64_t batch_size_;
-  int64_t connection_timeout_;
-  int64_t data_timeout_;
   std::string conflict_resolution_;
   bool reject_zero_byte_;
   bool dot_rename_;
-  std::string host_key_file_;
-  bool strict_host_checking_;
-  bool use_keepalive_on_timeout_;
-  bool use_compression_;
-  std::string proxy_type_;
-
-  static constexpr size_t CONNECTION_CACHE_MAX_SIZE = 8U;
-  struct ConnectionCacheKey {
-    std::string hostname;
-    uint16_t port;
-    std::string username;
-    std::string proxy_type;
-    std::string proxy_host;
-    uint16_t proxy_port;
-
-    bool operator<(const ConnectionCacheKey& other) const;
-    bool operator==(const ConnectionCacheKey& other) const;
-  };
-  std::mutex connections_mutex_;
-  std::map<ConnectionCacheKey, std::unique_ptr<utils::SFTPClient>> connections_;
-  std::list<ConnectionCacheKey> lru_;
-  std::unique_ptr<utils::SFTPClient> getConnectionFromCache(const ConnectionCacheKey& key);
-  void addConnectionToCache(const ConnectionCacheKey& key, std::unique_ptr<utils::SFTPClient>&& connection);
-
-  std::thread keepalive_thread_;
-  bool running_;
-  std::condition_variable keepalive_cv_;
-  void keepaliveThreadFunc();
 
   bool processOne(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
 };
diff --git a/extensions/sftp/processors/SFTPProcessorBase.cpp b/extensions/sftp/processors/SFTPProcessorBase.cpp
new file mode 100644
index 0000000..63c335a
--- /dev/null
+++ b/extensions/sftp/processors/SFTPProcessorBase.cpp
@@ -0,0 +1,475 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SFTPProcessorBase.h"
+
+#include <memory>
+#include <algorithm>
+#include <cctype>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <iterator>
+#include <limits>
+#include <map>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "utils/ByteArrayCallback.h"
+#include "core/FlowFile.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/Relationship.h"
+#include "io/DataStream.h"
+#include "io/StreamFactory.h"
+#include "ResourceClaim.h"
+#include "utils/StringUtils.h"
+#include "utils/ScopeGuard.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property SFTPProcessorBase::Hostname(
+    core::PropertyBuilder::createProperty("Hostname")->withDescription("The fully qualified hostname or IP address of the remote system")
+        ->isRequired(true)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::Port(
+    core::PropertyBuilder::createProperty("Port")->withDescription("The port that the remote system is listening on for file transfers")
+        ->isRequired(true)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::Username(
+    core::PropertyBuilder::createProperty("Username")->withDescription("Username")
+        ->isRequired(true)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::Password(
+    core::PropertyBuilder::createProperty("Password")->withDescription("Password for the user account")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::PrivateKeyPath(
+    core::PropertyBuilder::createProperty("Private Key Path")->withDescription("The fully qualified path to the Private Key file")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::PrivateKeyPassphrase(
+    core::PropertyBuilder::createProperty("Private Key Passphrase")->withDescription("Password for the private key")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::StrictHostKeyChecking(
+    core::PropertyBuilder::createProperty("Strict Host Key Checking")->withDescription("Indicates whether or not strict enforcement of hosts keys should be applied")
+        ->isRequired(true)->withDefaultValue<bool>(false)->build());
+core::Property SFTPProcessorBase::HostKeyFile(
+    core::PropertyBuilder::createProperty("Host Key File")->withDescription("If supplied, the given file will be used as the Host Key; otherwise, no use host key file will be used")
+        ->isRequired(false)->build());
+core::Property SFTPProcessorBase::ConnectionTimeout(
+    core::PropertyBuilder::createProperty("Connection Timeout")->withDescription("Amount of time to wait before timing out while creating a connection")
+        ->isRequired(true)->withDefaultValue<core::TimePeriodValue>("30 sec")->build());
+core::Property SFTPProcessorBase::DataTimeout(
+    core::PropertyBuilder::createProperty("Data Timeout")->withDescription("When transferring a file between the local and remote system, this value specifies how long is allowed to elapse without any data being transferred between systems")
+        ->isRequired(true)->withDefaultValue<core::TimePeriodValue>("30 sec")->build());
+core::Property SFTPProcessorBase::SendKeepaliveOnTimeout(
+    core::PropertyBuilder::createProperty("Send Keep Alive On Timeout")->withDescription("Indicates whether or not to send a single Keep Alive message when SSH socket times out")
+        ->isRequired(true)->withDefaultValue<bool>(true)->build());
+core::Property SFTPProcessorBase::ProxyType(
+    core::PropertyBuilder::createProperty("Proxy Type")->withDescription("Specifies the Proxy Configuration Controller Service to proxy network requests. If set, it supersedes proxy settings configured per component. "
+                                                                         "Supported proxies: HTTP + AuthN, SOCKS + AuthN")
+        ->isRequired(false)
+        ->withAllowableValues<std::string>({PROXY_TYPE_DIRECT,
+                                            PROXY_TYPE_HTTP,
+                                            PROXY_TYPE_SOCKS})
+        ->withDefaultValue(PROXY_TYPE_DIRECT)->build());
+core::Property SFTPProcessorBase::ProxyHost(
+    core::PropertyBuilder::createProperty("Proxy Host")->withDescription("The fully qualified hostname or IP address of the proxy server")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::ProxyPort(
+    core::PropertyBuilder::createProperty("Proxy Port")->withDescription("The port of the proxy server")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::HttpProxyUsername(
+    core::PropertyBuilder::createProperty("Http Proxy Username")->withDescription("Http Proxy Username")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+core::Property SFTPProcessorBase::HttpProxyPassword(
+    core::PropertyBuilder::createProperty("Http Proxy Password")->withDescription("Http Proxy Password")
+        ->isRequired(false)->supportsExpressionLanguage(true)->build());
+
+constexpr size_t SFTPProcessorBase::CONNECTION_CACHE_MAX_SIZE;
+
+SFTPProcessorBase::SFTPProcessorBase(std::string name, utils::Identifier uuid)
+    : Processor(name, uuid),
+      connection_timeout_(0),
+      data_timeout_(0),
+      strict_host_checking_(false),
+      use_keepalive_on_timeout_(false),
+      use_compression_(false),
+      running_(true) {
+}
+
+SFTPProcessorBase::~SFTPProcessorBase() {
+  if (keepalive_thread_.joinable()) {
+    {
+      std::lock_guard<std::mutex> lock(connections_mutex_);
+      running_ = false;
+      keepalive_cv_.notify_one();
+    }
+    keepalive_thread_.join();
+  }
+}
+
+void SFTPProcessorBase::notifyStop() {
+  logger_->log_debug("Got notifyStop, stopping keepalive thread and clearing connections");
+  cleanupConnectionCache();
+}
+
+void SFTPProcessorBase::addSupportedCommonProperties(std::set<core::Property>& supported_properties) {
+  supported_properties.insert(Hostname);
+  supported_properties.insert(Port);
+  supported_properties.insert(Username);
+  supported_properties.insert(Password);
+  supported_properties.insert(PrivateKeyPath);
+  supported_properties.insert(PrivateKeyPassphrase);
+  supported_properties.insert(StrictHostKeyChecking);
+  supported_properties.insert(HostKeyFile);
+  supported_properties.insert(ConnectionTimeout);
+  supported_properties.insert(DataTimeout);
+  supported_properties.insert(SendKeepaliveOnTimeout);
+  supported_properties.insert(ProxyType);
+  supported_properties.insert(ProxyHost);
+  supported_properties.insert(ProxyPort);
+  supported_properties.insert(HttpProxyUsername);
+  supported_properties.insert(HttpProxyPassword);
+}
+
+void SFTPProcessorBase::parseCommonPropertiesOnSchedule(const std::shared_ptr<core::ProcessContext>& context) {
+  std::string value;
+  if (!context->getProperty(StrictHostKeyChecking.getName(), value)) {
+    logger_->log_error("Strict Host Key Checking attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, strict_host_checking_);
+  }
+  context->getProperty(HostKeyFile.getName(), host_key_file_);
+  if (!context->getProperty(ConnectionTimeout.getName(), value)) {
+    logger_->log_error("Connection Timeout attribute is missing or invalid");
+  } else {
+    core::TimeUnit unit;
+    if (!core::Property::StringToTime(value, connection_timeout_, unit) || !core::Property::ConvertTimeUnitToMS(connection_timeout_, unit, connection_timeout_)) {
+      logger_->log_error("Connection Timeout attribute is invalid");
+    }
+  }
+  if (!context->getProperty(DataTimeout.getName(), value)) {
+    logger_->log_error("Data Timeout attribute is missing or invalid");
+  } else {
+    core::TimeUnit unit;
+    if (!core::Property::StringToTime(value, data_timeout_, unit) || !core::Property::ConvertTimeUnitToMS(data_timeout_, unit, data_timeout_)) {
+      logger_->log_error("Data Timeout attribute is invalid");
+    }
+  }
+  if (!context->getProperty(SendKeepaliveOnTimeout.getName(), value)) {
+    logger_->log_error("Send Keep Alive On Timeout attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, use_keepalive_on_timeout_);
+  }
+  context->getProperty(ProxyType.getName(), proxy_type_);
+}
+
+SFTPProcessorBase::CommonProperties::CommonProperties()
+    : port(0U)
+    , proxy_port(0U)
+{
+}
+
+bool SFTPProcessorBase::parseCommonPropertiesOnTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<FlowFileRecord>& flow_file, CommonProperties& common_properties) {
+  std::string value;
+  if (!context->getProperty(Hostname, common_properties.hostname, flow_file)) {
+    logger_->log_error("Hostname attribute is missing");
+    return false;
+  }
+  if (!context->getProperty(Port, value, flow_file)) {
+    logger_->log_error("Port attribute is missing or invalid");
+    return false;
+  } else {
+    int port_tmp;
+    if (!core::Property::StringToInt(value, port_tmp) ||
+        port_tmp <= std::numeric_limits<uint16_t>::min() ||
+        port_tmp > std::numeric_limits<uint16_t>::max()) {
+      logger_->log_error("Port attribute \"%s\" is invalid", value);
+      return false;
+    } else {
+      common_properties.port = static_cast<uint16_t>(port_tmp);
+    }
+  }
+  if (!context->getProperty(Username, common_properties.username, flow_file)) {
+    logger_->log_error("Username attribute is missing");
+    return false;
+  }
+  context->getProperty(Password, common_properties.password, flow_file);
+  context->getProperty(PrivateKeyPath, common_properties.private_key_path, flow_file);
+  context->getProperty(PrivateKeyPassphrase, common_properties.private_key_passphrase, flow_file);
+  context->getProperty(Password, common_properties.password, flow_file);
+  context->getProperty(ProxyHost, common_properties.proxy_host, flow_file);
+  if (context->getProperty(ProxyPort, value, flow_file) && !value.empty()) {
+    int port_tmp;
+    if (!core::Property::StringToInt(value, port_tmp) ||
+        port_tmp <= std::numeric_limits<uint16_t>::min() ||
+        port_tmp > std::numeric_limits<uint16_t>::max()) {
+      logger_->log_error("Proxy Port attribute \"%s\" is invalid", value);
+      return false;
+    } else {
+      common_properties.proxy_port = static_cast<uint16_t>(port_tmp);
+    }
+  }
+  context->getProperty(HttpProxyUsername, common_properties.proxy_username, flow_file);
+  context->getProperty(HttpProxyPassword, common_properties.proxy_password, flow_file);
+
+  return true;
+}
+
+bool SFTPProcessorBase::ConnectionCacheKey::operator<(const SFTPProcessorBase::ConnectionCacheKey& other) const {
+  return std::tie(hostname, port, username, proxy_type, proxy_host, proxy_port, proxy_username) <
+         std::tie(other.hostname, other.port, other.username, other.proxy_type, other.proxy_host, other.proxy_port, other.proxy_username);
+}
+
+bool SFTPProcessorBase::ConnectionCacheKey::operator==(const SFTPProcessorBase::ConnectionCacheKey& other) const {
+  return std::tie(hostname, port, username, proxy_type, proxy_host, proxy_port, proxy_username) ==
+         std::tie(other.hostname, other.port, other.username, other.proxy_type, other.proxy_host, other.proxy_port, other.proxy_username);
+}
+
+std::unique_ptr<utils::SFTPClient> SFTPProcessorBase::getConnectionFromCache(const SFTPProcessorBase::ConnectionCacheKey& key) {
+  std::lock_guard<std::mutex> lock(connections_mutex_);
+
+  auto it = connections_.find(key);
+  if (it == connections_.end()) {
+    return nullptr;
+  }
+
+  logger_->log_debug("Removing %s@%s:%hu from SFTP connection pool",
+                     key.username,
+                     key.hostname,
+                     key.port);
+
+  auto lru_it = std::find(lru_.begin(), lru_.end(), key);
+  if (lru_it == lru_.end()) {
+    logger_->log_trace("Assertion error: can't find key in LRU cache");
+  } else {
+    lru_.erase(lru_it);
+  }
+
+  auto connection = std::move(it->second);
+  connections_.erase(it);
+  return connection;
+}
+
+void SFTPProcessorBase::addConnectionToCache(const SFTPProcessorBase::ConnectionCacheKey& key, std::unique_ptr<utils::SFTPClient>&& connection) {
+  std::lock_guard<std::mutex> lock(connections_mutex_);
+
+  while (connections_.size() >= SFTPProcessorBase::CONNECTION_CACHE_MAX_SIZE) {
+    const auto& lru_key = lru_.back();
+    logger_->log_debug("SFTP connection pool is full, removing %s@%s:%hu",
+                       lru_key.username,
+                       lru_key.hostname,
+                       lru_key.port);
+    connections_.erase(lru_key);
+    lru_.pop_back();
+  }
+
+  logger_->log_debug("Adding %s@%s:%hu to SFTP connection pool",
+                     key.username,
+                     key.hostname,
+                     key.port);
+  connections_.emplace(key, std::move(connection));
+  lru_.push_front(key);
+  keepalive_cv_.notify_one();
+}
+
+void SFTPProcessorBase::keepaliveThreadFunc() {
+  std::unique_lock<std::mutex> lock(connections_mutex_);
+
+  while (true) {
+    if (connections_.empty()) {
+      keepalive_cv_.wait(lock, [this] {
+        return !running_ || !connections_.empty();
+      });
+    }
+    if (!running_) {
+      logger_->log_trace("Stopping keepalive thread");
+      lock.unlock();
+      return;
+    }
+
+    int min_wait = 10;
+    for (auto &connection : connections_) {
+      int seconds_to_next = 0;
+      if (connection.second->sendKeepAliveIfNeeded(seconds_to_next)) {
+        logger_->log_debug("Sent keepalive to %s@%s:%hu if needed, next keepalive in %d s",
+                           connection.first.username,
+                           connection.first.hostname,
+                           connection.first.port,
+                           seconds_to_next);
+        if (seconds_to_next < min_wait) {
+          min_wait = seconds_to_next;
+        }
+      } else {
+        logger_->log_debug("Failed to send keepalive to %s@%s:%hu",
+                           connection.first.username,
+                           connection.first.hostname,
+                           connection.first.port);
+      }
+    }
+
+    /* Avoid busy loops */
+    if (min_wait < 1) {
+      min_wait = 1;
+    }
+
+    logger_->log_trace("Keepalive thread is going to sleep for %d s", min_wait);
+    keepalive_cv_.wait_for(lock, std::chrono::seconds(min_wait), [this] {
+      return !running_;
+    });
+    if (!running_) {
+      lock.unlock();
+      return;
+    }
+  }
+}
+
+void SFTPProcessorBase::startKeepaliveThreadIfNeeded() {
+  if (use_keepalive_on_timeout_ && !keepalive_thread_.joinable()) {
+    running_ = true;
+    keepalive_thread_ = std::thread(&SFTPProcessorBase::keepaliveThreadFunc, this);
+  }
+}
+
+void SFTPProcessorBase::cleanupConnectionCache() {
+  if (keepalive_thread_.joinable()) {
+    {
+      std::lock_guard<std::mutex> lock(connections_mutex_);
+      running_ = false;
+      keepalive_cv_.notify_one();
+    }
+    keepalive_thread_.join();
+  }
+  /* The thread is no longer running, we don't have to lock */
+  connections_.clear();
+  lru_.clear();
+}
+
+std::unique_ptr<utils::SFTPClient> SFTPProcessorBase::getOrCreateConnection(
+    const SFTPProcessorBase::ConnectionCacheKey& connection_cache_key,
+    const std::string& password,
+    const std::string& private_key_path,
+    const std::string& private_key_passphrase,
+    const std::string& proxy_password) {
+  auto client = getConnectionFromCache(connection_cache_key);
+  if (client == nullptr) {
+    client = std::unique_ptr<utils::SFTPClient>(new utils::SFTPClient(connection_cache_key.hostname,
+                                                                      connection_cache_key.port,
+                                                                      connection_cache_key.username));
+    if (!IsNullOrEmpty(host_key_file_)) {
+      if (!client->setHostKeyFile(host_key_file_, strict_host_checking_)) {
+        logger_->log_error("Cannot set host key file");
+        return nullptr;
+      }
+    }
+    if (!IsNullOrEmpty(password)) {
+      client->setPasswordAuthenticationCredentials(password);
+    }
+    if (!IsNullOrEmpty(private_key_path)) {
+      client->setPublicKeyAuthenticationCredentials(private_key_path, private_key_passphrase);
+    }
+    if (connection_cache_key.proxy_type != PROXY_TYPE_DIRECT) {
+      utils::HTTPProxy proxy;
+      proxy.host = connection_cache_key.proxy_host;
+      proxy.port = connection_cache_key.proxy_port;
+      proxy.username = connection_cache_key.proxy_username;
+      proxy.password = proxy_password;
+      if (!client->setProxy(
+          connection_cache_key.proxy_type == PROXY_TYPE_HTTP ? utils::SFTPClient::ProxyType::Http : utils::SFTPClient::ProxyType::Socks,
+          proxy)) {
+        logger_->log_error("Cannot set proxy");
+        return nullptr;
+      }
+    }
+    if (!client->setConnectionTimeout(connection_timeout_)) {
+      logger_->log_error("Cannot set connection timeout");
+      return nullptr;
+    }
+    client->setDataTimeout(data_timeout_);
+    client->setSendKeepAlive(use_keepalive_on_timeout_);
+    if (!client->setUseCompression(use_compression_)) {
+      logger_->log_error("Cannot set compression");
+      return nullptr;
+    }
+
+    /* Connect to SFTP server */
+    if (!client->connect()) {
+      logger_->log_error("Cannot connect to SFTP server");
+      return nullptr;
+    }
+  }
+
+  return client;
+}
+
+SFTPProcessorBase::CreateDirectoryHierarchyError SFTPProcessorBase::createDirectoryHierarchy(
+    utils::SFTPClient& client,
+    const std::string& remote_path,
+    bool disable_directory_listing) {
+  bool should_create_directory = disable_directory_listing;
+  if (!disable_directory_listing) {
+    LIBSSH2_SFTP_ATTRIBUTES attrs;
+    if (!client.stat(remote_path, true /*follow_symlinks*/, attrs)) {
+      if (client.getLastError() != utils::SFTPError::SFTP_ERROR_FILE_NOT_EXISTS) {
+        logger_->log_error("Failed to stat %s", remote_path.c_str());
+      }
+      should_create_directory = true;
+    } else {
+      if (attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS && !LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
+        logger_->log_error("Remote path %s is not a directory", remote_path.c_str());
+        return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_A_DIRECTORY;
+      }
+      logger_->log_debug("Found remote directory %s", remote_path.c_str());
+    }
+  }
+  if (should_create_directory) {
+    (void) client.createDirectoryHierarchy(remote_path);
+    if (!disable_directory_listing) {
+      LIBSSH2_SFTP_ATTRIBUTES attrs;
+      if (!client.stat(remote_path, true /*follow_symlinks*/, attrs)) {
+        auto last_error = client.getLastError();
+        if (last_error == utils::SFTPError::SFTP_ERROR_FILE_NOT_EXISTS) {
+          logger_->log_error("Could not find remote directory %s after creating it", remote_path.c_str());
+          return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_FOUND;
+        } else if (last_error == utils::SFTPError::SFTP_ERROR_PERMISSION_DENIED) {
+          logger_->log_error("Permission denied when reading remote directory %s after creating it", remote_path.c_str());
+          return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_PERMISSION_DENIED;
+        } else {
+          logger_->log_error("Failed to stat %s", remote_path.c_str());
+          return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_STAT_FAILED;
+        }
+      } else {
+        if ((attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS) && !LIBSSH2_SFTP_S_ISDIR(attrs.permissions)) {
+          logger_->log_error("Remote path %s is not a directory", remote_path.c_str());
+          return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_A_DIRECTORY;
+        }
+      }
+    }
+  }
+
+  return CreateDirectoryHierarchyError::CREATE_DIRECTORY_HIERARCHY_ERROR_OK;
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sftp/processors/SFTPProcessorBase.h b/extensions/sftp/processors/SFTPProcessorBase.h
new file mode 100644
index 0000000..4d83fff
--- /dev/null
+++ b/extensions/sftp/processors/SFTPProcessorBase.h
@@ -0,0 +1,156 @@
+/**
+ * PutSFTP class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __SFTP_PROCESSOR_BASE_H__
+#define __SFTP_PROCESSOR_BASE_H__
+
+#include <memory>
+#include <string>
+#include <list>
+#include <map>
+#include <mutex>
+#include <thread>
+
+#include "utils/ByteArrayCallback.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+#include "core/Resource.h"
+#include "controllers/SSLContextService.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+#include "../client/SFTPClient.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class SFTPProcessorBase : public core::Processor {
+ public:
+  SFTPProcessorBase(std::string name, utils::Identifier uuid);
+  virtual ~SFTPProcessorBase();
+
+  // Supported Properties
+  static core::Property Hostname;
+  static core::Property Port;
+  static core::Property Username;
+  static core::Property Password;
+  static core::Property PrivateKeyPath;
+  static core::Property PrivateKeyPassphrase;
+  static core::Property StrictHostKeyChecking;
+  static core::Property HostKeyFile;
+  static core::Property ConnectionTimeout;
+  static core::Property DataTimeout;
+  static core::Property SendKeepaliveOnTimeout;
+  static core::Property TargetSystemTimestampPrecision;
+  static core::Property ProxyType;
+  static core::Property ProxyHost;
+  static core::Property ProxyPort;
+  static core::Property HttpProxyUsername;
+  static core::Property HttpProxyPassword;
+
+  static constexpr char const *PROXY_TYPE_DIRECT = "DIRECT";
+  static constexpr char const *PROXY_TYPE_HTTP = "HTTP";
+  static constexpr char const *PROXY_TYPE_SOCKS = "SOCKS";
+
+  virtual void notifyStop() override;
+
+ protected:
+  std::shared_ptr<logging::Logger> logger_;
+
+  int64_t connection_timeout_;
+  int64_t data_timeout_;
+  std::string host_key_file_;
+  bool strict_host_checking_;
+  bool use_keepalive_on_timeout_;
+  bool use_compression_;
+  std::string proxy_type_;
+
+  void addSupportedCommonProperties(std::set<core::Property>& supported_properties);
+  void parseCommonPropertiesOnSchedule(const std::shared_ptr<core::ProcessContext>& context);
+  struct CommonProperties {
+    std::string hostname;
+    uint16_t port;
+    std::string username;
+    std::string password;
+    std::string private_key_path;
+    std::string private_key_passphrase;
+    std::string remote_path;
+    std::string proxy_host;
+    uint16_t proxy_port;
+    std::string proxy_username;
+    std::string proxy_password;
+
+    CommonProperties();
+  };
+  bool parseCommonPropertiesOnTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<FlowFileRecord>& flow_file, CommonProperties& common_properties);
+
+  static constexpr size_t CONNECTION_CACHE_MAX_SIZE = 8U;
+  struct ConnectionCacheKey {
+    std::string hostname;
+    uint16_t port;
+    std::string username;
+    std::string proxy_type;
+    std::string proxy_host;
+    uint16_t proxy_port;
+    std::string proxy_username;
+
+    bool operator<(const ConnectionCacheKey& other) const;
+    bool operator==(const ConnectionCacheKey& other) const;
+  };
+  std::mutex connections_mutex_;
+  std::map<ConnectionCacheKey, std::unique_ptr<utils::SFTPClient>> connections_;
+  std::list<ConnectionCacheKey> lru_;
+  std::unique_ptr<utils::SFTPClient> getConnectionFromCache(const ConnectionCacheKey& key);
+  void addConnectionToCache(const ConnectionCacheKey& key, std::unique_ptr<utils::SFTPClient>&& connection);
+
+  std::thread keepalive_thread_;
+  bool running_;
+  std::condition_variable keepalive_cv_;
+  void keepaliveThreadFunc();
+
+  void startKeepaliveThreadIfNeeded();
+  void cleanupConnectionCache();
+  std::unique_ptr<utils::SFTPClient> getOrCreateConnection(
+      const SFTPProcessorBase::ConnectionCacheKey& connection_cache_key,
+      const std::string& password,
+      const std::string& private_key_path,
+      const std::string& private_key_passphrase,
+      const std::string& proxy_password);
+
+  enum class CreateDirectoryHierarchyError : uint8_t {
+    CREATE_DIRECTORY_HIERARCHY_ERROR_OK = 0,
+    CREATE_DIRECTORY_HIERARCHY_ERROR_STAT_FAILED,
+    CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_A_DIRECTORY,
+    CREATE_DIRECTORY_HIERARCHY_ERROR_NOT_FOUND,
+    CREATE_DIRECTORY_HIERARCHY_ERROR_PERMISSION_DENIED,
+  };
+  CreateDirectoryHierarchyError createDirectoryHierarchy(utils::SFTPClient& client, const std::string& remote_path, bool disable_directory_listing);
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif
diff --git a/extensions/sftp/tests/CMakeLists.txt b/extensions/sftp/tests/CMakeLists.txt
index 998d1ed..f8663f7 100644
--- a/extensions/sftp/tests/CMakeLists.txt
+++ b/extensions/sftp/tests/CMakeLists.txt
@@ -61,4 +61,4 @@
 	add_subdirectory(tools)
 else()
 	message("Could find Java and Maven to build SFTPTestServer, disabling SFTP tests")
-endif()
\ No newline at end of file
+endif()
diff --git a/extensions/sftp/tests/FetchSFTPTests.cpp b/extensions/sftp/tests/FetchSFTPTests.cpp
new file mode 100644
index 0000000..d7e95c7
--- /dev/null
+++ b/extensions/sftp/tests/FetchSFTPTests.cpp
@@ -0,0 +1,425 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <cstring>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include <algorithm>
+#include <functional>
+#include <iterator>
+#include <random>
+#ifndef WIN32
+#include <unistd.h>
+#endif
+
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "utils/file/FileUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "processors/FetchSFTP.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/LogAttribute.h"
+#include "processors/UpdateAttribute.h"
+#include "processors/PutFile.h"
+#include "tools/SFTPTestServer.h"
+
+class FetchSFTPTestsFixture {
+ public:
+  FetchSFTPTestsFixture()
+  : src_dir(strdup("/tmp/sftps.XXXXXX"))
+  , dst_dir(strdup("/tmp/sftpd.XXXXXX")) {
+    LogTestController::getInstance().setTrace<TestPlan>();
+    LogTestController::getInstance().setDebug<minifi::FlowController>();
+    LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
+    LogTestController::getInstance().setDebug<minifi::core::ProcessGroup>();
+    LogTestController::getInstance().setDebug<minifi::core::Processor>();
+    LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+    LogTestController::getInstance().setDebug<processors::GenerateFlowFile>();
+    LogTestController::getInstance().setTrace<minifi::utils::SFTPClient>();
+    LogTestController::getInstance().setTrace<processors::FetchSFTP>();
+    LogTestController::getInstance().setTrace<processors::PutFile>();
+    LogTestController::getInstance().setDebug<processors::LogAttribute>();
+    LogTestController::getInstance().setDebug<SFTPTestServer>();
+
+    // Create temporary directories
+    testController.createTempDirectory(src_dir);
+    REQUIRE(src_dir != nullptr);
+    testController.createTempDirectory(dst_dir);
+    REQUIRE(dst_dir != nullptr);
+
+    // Start SFTP server
+    sftp_server = std::unique_ptr<SFTPTestServer>(new SFTPTestServer(src_dir));
+    REQUIRE(true == sftp_server->start());
+
+    // Build MiNiFi processing graph
+    plan = testController.createPlan();
+    generate_flow_file = plan->addProcessor(
+        "GenerateFlowFile",
+        "GenerateFlowFile");
+    update_attribute = plan->addProcessor("UpdateAttribute",
+         "UpdateAttribute",
+         core::Relationship("success", "d"),
+         true);
+    fetch_sftp = plan->addProcessor(
+        "FetchSFTP",
+        "FetchSFTP",
+        core::Relationship("success", "d"),
+        true);
+    plan->addProcessor("LogAttribute",
+        "LogAttribute",
+        { core::Relationship("success", "d"),
+          core::Relationship("comms.failure", "d"),
+          core::Relationship("not.found", "d"),
+          core::Relationship("permission.denied", "d") },
+          true);
+    put_file = plan->addProcessor("PutFile",
+         "PutFile",
+         core::Relationship("success", "d"),
+         true);
+
+    // Configure GenerateFlowFile processor
+    plan->setProperty(generate_flow_file, "File Size", "1B");
+
+    // Configure FetchSFTP processor
+    plan->setProperty(fetch_sftp, "Hostname", "localhost");
+    plan->setProperty(fetch_sftp, "Port", std::to_string(sftp_server->getPort()));
+    plan->setProperty(fetch_sftp, "Username", "nifiuser");
+    plan->setProperty(fetch_sftp, "Password", "nifipassword");
+    plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_NONE);
+    plan->setProperty(fetch_sftp, "Connection Timeout", "30 sec");
+    plan->setProperty(fetch_sftp, "Data Timeout", "30 sec");
+    plan->setProperty(fetch_sftp, "Strict Host Key Checking", "false");
+    plan->setProperty(fetch_sftp, "Send Keep Alive On Timeout", "true");
+    plan->setProperty(fetch_sftp, "Use Compression", "false");
+
+    // Configure PutFile processor
+    plan->setProperty(put_file, "Directory", std::string(dst_dir) + "/${path}");
+    plan->setProperty(put_file, "Conflict Resolution Strategy", processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL);
+    plan->setProperty(put_file, "Create Missing Directories", "true");
+  }
+
+  virtual ~FetchSFTPTestsFixture() {
+    free(src_dir);
+    free(dst_dir);
+    LogTestController::getInstance().reset();
+  }
+
+  // Create source file
+  void createFile(const std::string& relative_path, const std::string& content) {
+    std::fstream file;
+    std::stringstream ss;
+    ss << src_dir << "/vfs/" << relative_path;
+    utils::file::FileUtils::create_dir(utils::file::FileUtils::get_parent_path(ss.str())); // TODO
+    file.open(ss.str(), std::ios::out);
+    file << content;
+    file.close();
+  }
+
+  enum TestWhere {
+    IN_DESTINATION,
+    IN_SOURCE
+  };
+
+  void testFile(TestWhere where, const std::string& relative_path, const std::string& expected_content) {
+    std::stringstream resultFile;
+    if (where == IN_DESTINATION) {
+      resultFile << dst_dir << "/" << relative_path;
+    } else {
+      resultFile << src_dir << "/vfs/" << relative_path;
+#ifndef WIN32
+      /* Workaround for mina-sshd setting the read file's permissions to 0000 */
+      REQUIRE(0 == chmod(resultFile.str().c_str(), 0644));
+#endif
+    }
+    std::ifstream file(resultFile.str());
+    REQUIRE(true == file.good());
+    std::stringstream content;
+    std::vector<char> buffer(1024U);
+    while (file) {
+      file.read(buffer.data(), buffer.size());
+      content << std::string(buffer.data(), file.gcount());
+    }
+    REQUIRE(expected_content == content.str());
+  }
+
+  void testFileNotExists(TestWhere where, const std::string& relative_path) {
+    std::stringstream resultFile;
+    if (where == IN_DESTINATION) {
+      resultFile << dst_dir << "/" << relative_path;
+    } else {
+      resultFile << src_dir << "/vfs/" << relative_path;
+#ifndef WIN32
+      /* Workaround for mina-sshd setting the read file's permissions to 0000 */
+      REQUIRE(-1 == chmod(resultFile.str().c_str(), 0644));
+#endif
+    }
+    std::ifstream file(resultFile.str());
+    REQUIRE(false == file.is_open());
+    REQUIRE(false == file.good());
+  }
+
+ protected:
+  char *src_dir;
+  char *dst_dir;
+  std::unique_ptr<SFTPTestServer> sftp_server;
+  TestController testController;
+  std::shared_ptr<TestPlan> plan;
+  std::shared_ptr<core::Processor> generate_flow_file;
+  std::shared_ptr<core::Processor> update_attribute;
+  std::shared_ptr<core::Processor> fetch_sftp;
+  std::shared_ptr<core::Processor> put_file;
+};
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP fetch one file", "[FetchSFTP][basic]") {
+  plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+
+  createFile("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  testFile(IN_SOURCE, "nifi_test/tstFile.ext", "Test content 1");
+  testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+
+  REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+  REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP public key authentication", "[FetchSFTP][basic]") {
+  plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+  plan->setProperty(fetch_sftp, "Private Key Path", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/id_rsa"));
+  plan->setProperty(fetch_sftp, "Private Key Passphrase", "privatekeypassword");
+
+  createFile("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  testFile(IN_SOURCE, "nifi_test/tstFile.ext", "Test content 1");
+  testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+
+  REQUIRE(LogTestController::getInstance().contains("Successfully authenticated with publickey"));
+
+  REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+  REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP fetch non-existing file", "[FetchSFTP][basic]") {
+  plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Failed to open remote file \"nifi_test/tstFile.ext\", error: LIBSSH2_FX_NO_SUCH_FILE"));
+  REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship not.found"));
+}
+
+#ifndef WIN32
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP fetch non-readable file", "[FetchSFTP][basic]") {
+  if (getuid() == 0) {
+    std::cerr << "!!!! This test does NOT work as root. Exiting. !!!!" << std::endl;
+    return;
+  }
+
+  plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+
+  createFile("nifi_test/tstFile.ext", "Test content 1");
+  REQUIRE(0 == chmod((std::string(src_dir) + "/vfs/nifi_test/tstFile.ext").c_str(), 0000));
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Failed to open remote file \"nifi_test/tstFile.ext\", error: LIBSSH2_FX_PERMISSION_DENIED"));
+  REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship permission.denied"));
+}
+#endif
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP fetch connection error", "[FetchSFTP][basic]") {
+  plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+
+  createFile("nifi_test/tstFile.ext", "Test content 1");
+
+  /* Run it once normally to open the connection */
+  testController.runSession(plan, true);
+  plan->reset();
+
+  /* Stop the server to create a connection error */
+  sftp_server.reset();
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Failed to open remote file \"nifi_test/tstFile.ext\" due to an underlying SSH error"));
+  REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship comms.failure"));
+}
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP Completion Strategy Delete File success", "[FetchSFTP][completion-strategy]") {
+  plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+  plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_DELETE_FILE);
+
+  createFile("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  testFileNotExists(IN_SOURCE, "nifi_test/tstFile.ext");
+  testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+  REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+
+#ifndef WIN32
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP Completion Strategy Delete File fail", "[FetchSFTP][completion-strategy]") {
+  if (getuid() == 0) {
+    std::cerr << "!!!! This test does NOT work as root. Exiting. !!!!" << std::endl;
+    return;
+  }
+  plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+  plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_DELETE_FILE);
+
+  createFile("nifi_test/tstFile.ext", "Test content 1");
+  /* By making the parent directory non-writable we make it impossible do delete the source file */
+  REQUIRE(0 == chmod((std::string(src_dir) + "/vfs/nifi_test").c_str(), 0500));
+
+  testController.runSession(plan, true);
+
+  /* We should succeed even if the completion strategy fails */
+  testFile(IN_SOURCE, "nifi_test/tstFile.ext", "Test content 1");
+  testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+
+  REQUIRE(LogTestController::getInstance().contains("Failed to remove remote file \"nifi_test/tstFile.ext\", error: LIBSSH2_FX_PERMISSION_DENIED"));
+  REQUIRE(LogTestController::getInstance().contains("Completion Strategy is Delete File, but failed to delete remote file \"nifi_test/tstFile.ext\""));
+
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+  REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+#endif
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP Completion Strategy Move File success", "[FetchSFTP][completion-strategy]") {
+  plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+  plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_MOVE_FILE);
+  plan->setProperty(fetch_sftp, "Move Destination Directory", "nifi_done/");
+  plan->setProperty(fetch_sftp, "Create Directory", "true");
+
+  createFile("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  testFileNotExists(IN_SOURCE, "nifi_test/tstFile.ext");
+  testFile(IN_SOURCE, "nifi_done/tstFile.ext", "Test content 1");
+  testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+  REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP Completion Strategy Move File fail", "[FetchSFTP][completion-strategy]") {
+  plan->setProperty(fetch_sftp, "Remote File", "nifi_test/tstFile.ext");
+  plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_MOVE_FILE);
+  plan->setProperty(fetch_sftp, "Move Destination Directory", "nifi_done/");
+
+  /* The completion strategy should fail because the target directory does not exist and we don't create it */
+  plan->setProperty(fetch_sftp, "Create Directory", "false");
+
+  createFile("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  /* We should succeed even if the completion strategy fails */
+  testFileNotExists(IN_SOURCE, "nifi_done/tstFile.ext");
+  testFile(IN_SOURCE, "nifi_test/tstFile.ext", "Test content 1");
+  testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+
+  REQUIRE(LogTestController::getInstance().contains("Failed to rename remote file \"nifi_test/tstFile.ext\" to \"nifi_done/tstFile.ext\", error: LIBSSH2_FX_NO_SUCH_FILE"));
+  REQUIRE(LogTestController::getInstance().contains("Completion Strategy is Move File, but failed to move file \"nifi_test/tstFile.ext\" to \"nifi_done/tstFile.ext\""));
+
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+  REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+
+TEST_CASE_METHOD(FetchSFTPTestsFixture, "FetchSFTP expression language test", "[FetchSFTP]") {
+  plan->setProperty(update_attribute, "attr_Hostname", "localhost", true /*dynamic*/);
+  plan->setProperty(update_attribute, "attr_Port", std::to_string(sftp_server->getPort()), true /*dynamic*/);
+  plan->setProperty(update_attribute, "attr_Username", "nifiuser", true /*dynamic*/);
+  plan->setProperty(update_attribute, "attr_Password", "nifipassword", true /*dynamic*/);
+  plan->setProperty(update_attribute, "attr_Private Key Path",
+  utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/id_rsa"), true /*dynamic*/);
+  plan->setProperty(update_attribute, "attr_Private Key Passphrase", "privatekeypassword", true /*dynamic*/);
+  plan->setProperty(update_attribute, "attr_Remote File", "nifi_test/tstFile.ext", true /*dynamic*/);
+  plan->setProperty(update_attribute, "attr_Move Destination Directory", "nifi_done/", true /*dynamic*/);
+
+  plan->setProperty(fetch_sftp, "Hostname", "${'attr_Hostname'}");
+  plan->setProperty(fetch_sftp, "Port", "${'attr_Port'}");
+  plan->setProperty(fetch_sftp, "Username", "${'attr_Username'}");
+  plan->setProperty(fetch_sftp, "Password", "${'attr_Password'}");
+  plan->setProperty(fetch_sftp, "Private Key Path", "${'attr_Private Key Path'}");
+  plan->setProperty(fetch_sftp, "Private Key Passphrase", "${'attr_Private Key Passphrase'}");
+  plan->setProperty(fetch_sftp, "Remote File", "${'attr_Remote File'}");
+  plan->setProperty(fetch_sftp, "Move Destination Directory", "${'attr_Move Destination Directory'}");
+
+  plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_MOVE_FILE);
+  plan->setProperty(fetch_sftp, "Create Directory", "true");
+
+  createFile("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  testFileNotExists(IN_SOURCE, "nifi_test/tstFile.ext");
+  testFile(IN_SOURCE, "nifi_done/tstFile.ext", "Test content 1");
+  testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+
+  REQUIRE(LogTestController::getInstance().contains("Successfully authenticated with publickey"));
+  REQUIRE(LogTestController::getInstance().contains("from FetchSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.filename value:nifi_test/tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+  REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
diff --git a/extensions/sftp/tests/ListSFTPTests.cpp b/extensions/sftp/tests/ListSFTPTests.cpp
new file mode 100644
index 0000000..f82b010
--- /dev/null
+++ b/extensions/sftp/tests/ListSFTPTests.cpp
@@ -0,0 +1,907 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <cstring>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include <algorithm>
+#include <functional>
+#include <iterator>
+#include <random>
+#ifndef WIN32
+#include <unistd.h>
+#endif
+
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "utils/file/FileUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "processors/ListSFTP.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/LogAttribute.h"
+#include "processors/UpdateAttribute.h"
+#include "tools/SFTPTestServer.h"
+
+class ListSFTPTestsFixture {
+ public:
+  ListSFTPTestsFixture()
+  : src_dir(strdup("/tmp/sftps.XXXXXX")) {
+    LogTestController::getInstance().setTrace<TestPlan>();
+    LogTestController::getInstance().setDebug<minifi::FlowController>();
+    LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
+    LogTestController::getInstance().setDebug<minifi::core::ProcessGroup>();
+    LogTestController::getInstance().setDebug<minifi::core::Processor>();
+    LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+    LogTestController::getInstance().setDebug<processors::GenerateFlowFile>();
+    LogTestController::getInstance().setTrace<minifi::utils::SFTPClient>();
+    LogTestController::getInstance().setTrace<processors::ListSFTP>();
+    LogTestController::getInstance().setDebug<processors::LogAttribute>();
+    LogTestController::getInstance().setDebug<SFTPTestServer>();
+
+    // Create temporary directories
+    testController.createTempDirectory(src_dir);
+    REQUIRE(src_dir != nullptr);
+
+    // Start SFTP server
+    sftp_server = std::unique_ptr<SFTPTestServer>(new SFTPTestServer(src_dir));
+    REQUIRE(true == sftp_server->start());
+
+    // Build MiNiFi processing graph
+    createPlan();
+  }
+
+  virtual ~ListSFTPTestsFixture() {
+    free(src_dir);
+    LogTestController::getInstance().reset();
+  }
+
+  void createPlan(utils::Identifier* list_sftp_uuid = nullptr) {
+    plan = testController.createPlan();
+    if (list_sftp_uuid == nullptr) {
+      list_sftp = plan->addProcessor(
+          "ListSFTP",
+          "ListSFTP");
+    } else {
+      list_sftp = plan->addProcessor(
+          "ListSFTP",
+          *list_sftp_uuid,
+          "ListSFTP",
+          {core::Relationship("success", "d")});
+    }
+    log_attribute = plan->addProcessor("LogAttribute",
+                                       "LogAttribute",
+                                       core::Relationship("success", "d"),
+                                       true);
+
+    // Configure ListSFTP processor
+    plan->setProperty(list_sftp, "Listing Strategy", processors::ListSFTP::LISTING_STRATEGY_TRACKING_TIMESTAMPS);
+    plan->setProperty(list_sftp, "Hostname", "localhost");
+    plan->setProperty(list_sftp, "Port", std::to_string(sftp_server->getPort()));
+    plan->setProperty(list_sftp, "Username", "nifiuser");
+    plan->setProperty(list_sftp, "Password", "nifipassword");
+    plan->setProperty(list_sftp, "Search Recursively", "false");
+    plan->setProperty(list_sftp, "Follow symlink", "false");
+    plan->setProperty(list_sftp, "Ignore Dotted Files", "false");
+    plan->setProperty(list_sftp, "Strict Host Key Checking", "false");
+    plan->setProperty(list_sftp, "Connection Timeout", "30 sec");
+    plan->setProperty(list_sftp, "Data Timeout", "30 sec");
+    plan->setProperty(list_sftp, "Send Keep Alive On Timeout", "true");
+    plan->setProperty(list_sftp, "Target System Timestamp Precision", processors::ListSFTP::TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT);
+    plan->setProperty(list_sftp, "Minimum File Age", "0 sec");
+    plan->setProperty(list_sftp, "Minimum File Size", "0 B");
+    plan->setProperty(list_sftp, "Target System Timestamp Precision", "Seconds");
+    plan->setProperty(list_sftp, "Remote Path", "nifi_test/");
+    plan->setProperty(list_sftp, "State File", std::string(src_dir) + "/state");
+
+    // Configure LogAttribute processor
+    plan->setProperty(log_attribute, "FlowFiles To Log", "0");
+  }
+
+  // Create source file
+  void createFile(const std::string& relative_path, const std::string& content, uint64_t modification_timestamp = 0U) {
+    std::fstream file;
+    std::stringstream ss;
+    ss << src_dir << "/vfs/" << relative_path;
+    auto full_path = ss.str();
+    std::deque<std::string> parent_dirs;
+    std::string parent_dir = full_path;
+    while ((parent_dir = utils::file::FileUtils::get_parent_path(parent_dir)) != "") {
+      parent_dirs.push_front(parent_dir);
+    }
+    for (const auto& dir : parent_dirs) {
+      utils::file::FileUtils::create_dir(dir);
+    }
+    file.open(ss.str(), std::ios::out);
+    file << content;
+    file.close();
+    if (modification_timestamp != 0U) {
+      REQUIRE(true == utils::file::FileUtils::set_last_write_time(full_path, modification_timestamp));
+    }
+  }
+
+  void createFileWithModificationTimeDiff(const std::string& relative_path, const std::string& content, int64_t modification_timediff = -300 /*5 minutes ago*/) {
+    time_t now = time(nullptr);
+    return createFile(relative_path, content, now + modification_timediff);
+  }
+
+ protected:
+  char *src_dir;
+  std::unique_ptr<SFTPTestServer> sftp_server;
+  TestController testController;
+  std::shared_ptr<TestPlan> plan;
+  std::shared_ptr<core::Processor> list_sftp;
+  std::shared_ptr<core::Processor> log_attribute;
+};
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list one file", "[ListSFTP][basic]") {
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP public key authentication", "[ListSFTP][basic]") {
+  plan->setProperty(list_sftp, "Remote File", "nifi_test/tstFile.ext");
+  plan->setProperty(list_sftp, "Private Key Path", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/id_rsa"));
+  plan->setProperty(list_sftp, "Private Key Passphrase", "privatekeypassword");
+
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Successfully authenticated with publickey"));
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list non-existing dir", "[ListSFTP][basic]") {
+  plan->setProperty(list_sftp, "Remote Path", "nifi_test2/");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("Failed to open remote directory \"nifi_test2\", error: LIBSSH2_FX_NO_SUCH_FILE"));
+  REQUIRE(LogTestController::getInstance().contains("There are no files to list. Yielding."));
+}
+
+#ifndef WIN32
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list non-readable dir", "[ListSFTP][basic]") {
+  if (getuid() == 0) {
+    std::cerr << "!!!! This test does NOT work as root. Exiting. !!!!" << std::endl;
+    return;
+  }
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+  REQUIRE(0 == chmod((std::string(src_dir) + "/vfs/nifi_test").c_str(), 0000));
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("Failed to open remote directory \"nifi_test\", error: LIBSSH2_FX_PERMISSION_DENIED"));
+  REQUIRE(LogTestController::getInstance().contains("There are no files to list. Yielding."));
+}
+#endif
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list one file writes attributes", "[ListSFTP][basic]") {
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  auto file = std::string(src_dir) + "/vfs/nifi_test/tstFile.ext";
+  auto mtime = utils::file::FileUtils::last_write_time(file);
+  std::string mtime_str;
+  REQUIRE(true == getDateTimeStr(mtime, mtime_str));
+  uint64_t uid, gid;
+  REQUIRE(true == utils::file::FileUtils::get_uid_gid(file, uid, gid));
+  uint32_t permissions;
+  REQUIRE(true == utils::file::FileUtils::get_permissions(file, permissions));
+  std::stringstream permissions_ss;
+  permissions_ss << std::setfill('0') << std::setw(4) << std::oct << permissions;
+
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.host value:localhost"));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.remote.port value:" + std::to_string(sftp_server->getPort())));
+  REQUIRE(LogTestController::getInstance().contains("key:sftp.listing.user value:nifiuser"));
+  REQUIRE(LogTestController::getInstance().contains("key:file.owner value:" + std::to_string(uid)));
+  REQUIRE(LogTestController::getInstance().contains("key:file.group value:" + std::to_string(gid)));
+  REQUIRE(LogTestController::getInstance().contains("key:file.permissions value:" + permissions_ss.str()));
+  REQUIRE(LogTestController::getInstance().contains("key:file.size value:14"));
+  REQUIRE(LogTestController::getInstance().contains("key:file.lastModifiedTime value:" + mtime_str));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list two files", "[ListSFTP][basic]") {
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test with longer content 2");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list two files one in a subdir no recursion", "[ListSFTP][basic]") {
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+  createFileWithModificationTimeDiff("nifi_test/subdir/file2.ext", "Test with longer content 2");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP list two files one in a subdir with recursion", "[ListSFTP][basic]") {
+  plan->setProperty(list_sftp, "Search Recursively", "true");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+  createFileWithModificationTimeDiff("nifi_test/subdir/file2.ext", "Test with longer content 2");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Minimum File Age too young", "[ListSFTP][file-age]") {
+  plan->setProperty(list_sftp, "Minimum File Age", "2 hours");
+
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/tstFile.ext\" because it is younger than the Minimum File Age"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Maximum File Age too old", "[ListSFTP][file-age]") {
+  plan->setProperty(list_sftp, "Maximum File Age", "1 min");
+
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/tstFile.ext\" because it is older than the Maximum File Age"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Minimum File Size too small", "[ListSFTP][file-size]") {
+  plan->setProperty(list_sftp, "Minimum File Size", "1 MB");
+
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+  REQUIRE(false == LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/tstFile.ext\" because it is smaller than the Minimum File Size: 14 B < 1048576 B"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Maximum File Size too large", "[ListSFTP][file-size]") {
+  plan->setProperty(list_sftp, "Maximum File Size", "4 B");
+
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+  REQUIRE(false == LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+  REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/tstFile.ext\" because it is larger than the Maximum File Size: 14 B > 4 B"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP File Filter Regex", "[ListSFTP][file-filter-regex]") {
+  plan->setProperty(list_sftp, "File Filter Regex", "^.*2.*$");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test with longer content 2");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Ignoring \"nifi_test/file1.ext\" because it did not match the File Filter Regex \"^.*2.*$\""));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Path Filter Regex", "[ListSFTP][path-filter-regex]") {
+  plan->setProperty(list_sftp, "Search Recursively", "true");
+  plan->setProperty(list_sftp, "Path Filter Regex", "^.*foobar.*$");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+  createFileWithModificationTimeDiff("nifi_test/foobar/file2.ext", "Test content 2");
+  createFileWithModificationTimeDiff("nifi_test/notbar/file3.ext", "Test with longer content 3");
+
+  testController.runSession(plan, true);
+
+  /* file1.ext is in the root */
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  /* file2.ext is in a matching subdirectory */
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+  /* file3.ext is in a non-matching subdirectory */
+  REQUIRE(LogTestController::getInstance().contains("Not recursing into \"nifi_test/notbar\" because it did not match the Path Filter Regex \"^.*foobar.*$\""));
+  REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file3.ext"));
+}
+
+#ifndef WIN32
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Follow symlink false file symlink", "[ListSFTP][follow-symlink]") {
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+  auto file1 = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
+  auto file2 = std::string(src_dir) + "/vfs/nifi_test/file2.ext";
+  REQUIRE(0 == symlink(file1.c_str(), file2.c_str()));
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("Skipping non-regular, non-directory file \"nifi_test/file2.ext\""));
+}
+#endif
+
+#ifndef WIN32
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Follow symlink true file symlink", "[ListSFTP][follow-symlink]") {
+  plan->setProperty(list_sftp, "Follow symlink", "true");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+  auto file1 = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
+  auto file2 = std::string(src_dir) + "/vfs/nifi_test/file2.ext";
+  REQUIRE(0 == symlink(file1.c_str(), file2.c_str()));
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+#endif
+
+#ifndef WIN32
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Follow symlink false directory symlink", "[ListSFTP][follow-symlink]") {
+  plan->setProperty(list_sftp, "Search Recursively", "true");
+
+  createFileWithModificationTimeDiff("nifi_test/dir1/file1.ext", "Test content 1");
+  auto dir1 = std::string(src_dir) + "/vfs/nifi_test/dir1";
+  auto dir2 = std::string(src_dir) + "/vfs/nifi_test/dir2";
+  REQUIRE(0 == symlink(dir1.c_str(), dir2.c_str()));
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("Skipping non-regular, non-directory file \"nifi_test/dir2\""));
+}
+#endif
+
+#ifndef WIN32
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Follow symlink true directory symlink", "[ListSFTP][follow-symlink]") {
+  plan->setProperty(list_sftp, "Search Recursively", "true");
+  plan->setProperty(list_sftp, "Follow symlink", "true");
+
+  createFileWithModificationTimeDiff("nifi_test/dir1/file1.ext", "Test content 1");
+  auto dir1 = std::string(src_dir) + "/vfs/nifi_test/dir1";
+  auto dir2 = std::string(src_dir) + "/vfs/nifi_test/dir2";
+  REQUIRE(0 == symlink(dir1.c_str(), dir2.c_str()));
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/dir1"));
+  REQUIRE(LogTestController::getInstance().contains("key:path value:nifi_test/dir2"));
+}
+#endif
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file", "[ListSFTP][tracking-timestamps]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("The latest listed entry timestamp is the same as the last listed entry timestamp"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file one new file", "[ListSFTP][tracking-timestamps]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file one older file", "[ListSFTP][tracking-timestamps]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -360 /* 6 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("Skipping \"nifi_test/file2.ext\", because it is not new."));
+  REQUIRE(LogTestController::getInstance().contains("The latest listed entry timestamp is the same as the last listed entry timestamp"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file another file with the same timestamp", "[ListSFTP][tracking-timestamps]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  auto file = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
+  auto mtime = utils::file::FileUtils::last_write_time(file);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  /* We must sleep to avoid triggering the listing lag. */
+  std::this_thread::sleep_for(std::chrono::milliseconds(1500));
+
+  createFile("nifi_test/file2.ext", "Test content 2", mtime);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps one file timestamp updated", "[ListSFTP][tracking-timestamps]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  auto file = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
+  auto mtime = utils::file::FileUtils::last_write_time(file);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  REQUIRE(true == utils::file::FileUtils::set_last_write_time(file, mtime + 1));
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  /* We must sleep to avoid triggering the listing lag. */
+  std::this_thread::sleep_for(std::chrono::milliseconds(1500));
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("and all files for that timestamp has been processed. Yielding."));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps restore state", "[ListSFTP][tracking-timestamps]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  utils::Identifier list_sftp_uuid;
+  REQUIRE(true == list_sftp->getUUID(list_sftp_uuid));
+  createPlan(&list_sftp_uuid);
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Successfully loaded Tracking Timestamps state file"));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps restore state changed configuration", "[ListSFTP][tracking-timestamps]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  utils::Identifier list_sftp_uuid;
+  REQUIRE(true == list_sftp->getUUID(list_sftp_uuid));
+  createPlan(&list_sftp_uuid);
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+  plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("was created with different settings than the current ones, ignoring. "
+                                                    "Hostname: \"localhost\" vs. \"localhost\", "
+                                                    "Username: \"nifiuser\" vs. \"nifiuser\", "
+                                                    "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps changed configuration", "[ListSFTP][tracking-timestamps]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Timestamps");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("was created with different settings than the current ones, ignoring. "
+  "Hostname: \"localhost\" vs. \"localhost\", "
+  "Username: \"nifiuser\" vs. \"nifiuser\", "
+  "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:tstFile.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/tstFile.ext\" because it has not changed"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file one new file", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file one older file in tracking window", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -360 /* 6 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file one older file outside tracking window", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -6 * 3600 /* 6 hours ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
+  REQUIRE(LogTestController::getInstance().contains("Skipping \"nifi_test/file2.ext\" because it has an older timestamp than the minimum timestamp to list"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file another file with the same timestamp", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  auto file = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
+  auto mtime = utils::file::FileUtils::last_write_time(file);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFile("nifi_test/file2.ext", "Test content 2", mtime);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file timestamp updated", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  auto file = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
+  auto mtime = utils::file::FileUtils::last_write_time(file);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  REQUIRE(true == utils::file::FileUtils::set_last_write_time(file, mtime + 1));
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Found file \"nifi_test/file1.ext\" with newer timestamp"));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities one file size changes", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  auto file = std::string(src_dir) + "/vfs/nifi_test/file1.ext";
+  auto mtime = utils::file::FileUtils::last_write_time(file);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFile("nifi_test/file1.ext", "Longer test content 1", mtime);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Found file \"nifi_test/file1.ext\" with different size: 14 -> 21"));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("Skipping file \"nifi_test/file1.ext\" because it has not changed"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities restore state", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  utils::Identifier list_sftp_uuid;
+  REQUIRE(true == list_sftp->getUUID(list_sftp_uuid));
+  createPlan(&list_sftp_uuid);
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Successfully loaded Tracking Entities state file"));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities restore state changed configuration", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  utils::Identifier list_sftp_uuid;
+  REQUIRE(true == list_sftp->getUUID(list_sftp_uuid));
+  createPlan(&list_sftp_uuid);
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+  plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("was created with different settings than the current ones, ignoring. "
+  "Hostname: \"localhost\" vs. \"localhost\", "
+  "Username: \"nifiuser\" vs. \"nifiuser\", "
+  "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities changed configuration", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+
+  plan->reset();
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  plan->setProperty(list_sftp, "Remote Path", "/nifi_test");
+
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("was created with different settings than the current ones, ignoring. "
+  "Hostname: \"localhost\" vs. \"localhost\", "
+  "Username: \"nifiuser\" vs. \"nifiuser\", "
+  "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities Initial Listing Target Tracking Time Window entity outside window", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+  plan->setProperty(list_sftp, "Entity Tracking Initial Listing Target", processors::ListSFTP::ENTITY_TRACKING_INITIAL_LISTING_TARGET_TRACKING_TIME_WINDOW);
+  plan->setProperty(list_sftp, "Entity Tracking Time Window", "10 minutes");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1", -20*60 /* 20 minutes ago */);
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("Skipping \"nifi_test/file1.ext\" because it has an older timestamp than the minimum timestamp to list"));
+  REQUIRE(false == LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+}
+
+TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities Initial Listing Target Tracking Time Window entity inside window", "[ListSFTP][tracking-entities]") {
+  plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
+  plan->setProperty(list_sftp, "Entity Tracking Initial Listing Target", processors::ListSFTP::ENTITY_TRACKING_INITIAL_LISTING_TARGET_TRACKING_TIME_WINDOW);
+  plan->setProperty(list_sftp, "Entity Tracking Time Window", "10 minutes");
+
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
+  REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+}
diff --git a/extensions/sftp/tests/ListThenFetchSFTPTests.cpp b/extensions/sftp/tests/ListThenFetchSFTPTests.cpp
new file mode 100644
index 0000000..0282cb9
--- /dev/null
+++ b/extensions/sftp/tests/ListThenFetchSFTPTests.cpp
@@ -0,0 +1,269 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <cstring>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include <algorithm>
+#include <functional>
+#include <iterator>
+#include <random>
+
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "utils/file/FileUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "processors/FetchSFTP.h"
+#include "processors/ListSFTP.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/LogAttribute.h"
+#include "processors/UpdateAttribute.h"
+#include "processors/PutFile.h"
+#include "tools/SFTPTestServer.h"
+
+class ListThenFetchSFTPTestsFixture {
+ public:
+  ListThenFetchSFTPTestsFixture()
+  : src_dir(strdup("/tmp/sftps.XXXXXX"))
+  , dst_dir(strdup("/tmp/sftpd.XXXXXX")) {
+    LogTestController::getInstance().setTrace<TestPlan>();
+    LogTestController::getInstance().setDebug<minifi::FlowController>();
+    LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
+    LogTestController::getInstance().setDebug<minifi::core::ProcessGroup>();
+    LogTestController::getInstance().setDebug<minifi::core::Processor>();
+    LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+    LogTestController::getInstance().setDebug<processors::GenerateFlowFile>();
+    LogTestController::getInstance().setTrace<minifi::utils::SFTPClient>();
+    LogTestController::getInstance().setTrace<processors::ListSFTP>();
+    LogTestController::getInstance().setTrace<processors::FetchSFTP>();
+    LogTestController::getInstance().setTrace<processors::PutFile>();
+    LogTestController::getInstance().setDebug<processors::LogAttribute>();
+    LogTestController::getInstance().setDebug<SFTPTestServer>();
+
+    // Create temporary directories
+    testController.createTempDirectory(src_dir);
+    REQUIRE(src_dir != nullptr);
+    testController.createTempDirectory(dst_dir);
+    REQUIRE(dst_dir != nullptr);
+
+    // Start SFTP server
+    sftp_server = std::unique_ptr<SFTPTestServer>(new SFTPTestServer(src_dir));
+    REQUIRE(true == sftp_server->start());
+
+    // Build MiNiFi processing graph
+    plan = testController.createPlan();
+    list_sftp = plan->addProcessor(
+        "ListSFTP",
+        "ListSFTP");
+    fetch_sftp = plan->addProcessor(
+        "FetchSFTP",
+        "FetchSFTP",
+        core::Relationship("success", "d"),
+        true);
+    log_attribute = plan->addProcessor("LogAttribute",
+        "LogAttribute",
+        { core::Relationship("success", "d"),
+          core::Relationship("comms.failure", "d"),
+          core::Relationship("not.found", "d"),
+          core::Relationship("permission.denied", "d") },
+          true);
+    put_file = plan->addProcessor("PutFile",
+         "PutFile",
+         core::Relationship("success", "d"),
+         true);
+
+    // Configure ListSFTP processor
+    plan->setProperty(list_sftp, "Listing Strategy", processors::ListSFTP::LISTING_STRATEGY_TRACKING_TIMESTAMPS);
+    plan->setProperty(list_sftp, "Hostname", "localhost");
+    plan->setProperty(list_sftp, "Port", std::to_string(sftp_server->getPort()));
+    plan->setProperty(list_sftp, "Username", "nifiuser");
+    plan->setProperty(list_sftp, "Password", "nifipassword");
+    plan->setProperty(list_sftp, "Search Recursively", "false");
+    plan->setProperty(list_sftp, "Follow symlink", "false");
+    plan->setProperty(list_sftp, "Ignore Dotted Files", "false");
+    plan->setProperty(list_sftp, "Strict Host Key Checking", "false");
+    plan->setProperty(list_sftp, "Connection Timeout", "30 sec");
+    plan->setProperty(list_sftp, "Data Timeout", "30 sec");
+    plan->setProperty(list_sftp, "Send Keep Alive On Timeout", "true");
+    plan->setProperty(list_sftp, "Target System Timestamp Precision", processors::ListSFTP::TARGET_SYSTEM_TIMESTAMP_PRECISION_AUTO_DETECT);
+    plan->setProperty(list_sftp, "Minimum File Age", "0 sec");
+    plan->setProperty(list_sftp, "Minimum File Size", "0 B");
+    plan->setProperty(list_sftp, "Target System Timestamp Precision", "Seconds");
+    plan->setProperty(list_sftp, "Remote Path", "nifi_test/");
+    plan->setProperty(list_sftp, "State File", std::string(src_dir) + "/state");
+
+    // Configure FetchSFTP processor
+    plan->setProperty(fetch_sftp, "Hostname", "localhost");
+    plan->setProperty(fetch_sftp, "Port", std::to_string(sftp_server->getPort()));
+    plan->setProperty(fetch_sftp, "Username", "nifiuser");
+    plan->setProperty(fetch_sftp, "Password", "nifipassword");
+    plan->setProperty(fetch_sftp, "Completion Strategy", processors::FetchSFTP::COMPLETION_STRATEGY_NONE);
+    plan->setProperty(fetch_sftp, "Connection Timeout", "30 sec");
+    plan->setProperty(fetch_sftp, "Data Timeout", "30 sec");
+    plan->setProperty(fetch_sftp, "Strict Host Key Checking", "false");
+    plan->setProperty(fetch_sftp, "Send Keep Alive On Timeout", "true");
+    plan->setProperty(fetch_sftp, "Use Compression", "false");
+    plan->setProperty(fetch_sftp, "Remote File", "${path}/${filename}");
+
+    // Configure LogAttribute processor
+    plan->setProperty(log_attribute, "FlowFiles To Log", "0");
+
+    // Configure PutFile processor
+    plan->setProperty(put_file, "Directory", std::string(dst_dir) + "/${path}");
+    plan->setProperty(put_file, "Conflict Resolution Strategy", processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL);
+    plan->setProperty(put_file, "Create Missing Directories", "true");
+  }
+
+  virtual ~ListThenFetchSFTPTestsFixture() {
+    free(src_dir);
+    free(dst_dir);
+    LogTestController::getInstance().reset();
+  }
+
+  // Create source file
+  void createFile(const std::string& relative_path, const std::string& content, uint64_t modification_timestamp = 0U) {
+    std::fstream file;
+    std::stringstream ss;
+    ss << src_dir << "/vfs/" << relative_path;
+    auto full_path = ss.str();
+    std::deque<std::string> parent_dirs;
+    std::string parent_dir = full_path;
+    while ((parent_dir = utils::file::FileUtils::get_parent_path(parent_dir)) != "") {
+      parent_dirs.push_front(parent_dir);
+    }
+    for (const auto& dir : parent_dirs) {
+      utils::file::FileUtils::create_dir(dir);
+    }
+    file.open(ss.str(), std::ios::out);
+    file << content;
+    file.close();
+    if (modification_timestamp != 0U) {
+      REQUIRE(true == utils::file::FileUtils::set_last_write_time(full_path, modification_timestamp));
+    }
+  }
+
+  void createFileWithModificationTimeDiff(const std::string& relative_path, const std::string& content, int64_t modification_timediff = -300 /*5 minutes ago*/) {
+    time_t now = time(nullptr);
+    return createFile(relative_path, content, now + modification_timediff);
+  }
+
+  enum TestWhere {
+    IN_DESTINATION,
+    IN_SOURCE
+  };
+
+  void testFile(TestWhere where, const std::string& relative_path, const std::string& expected_content) {
+    std::stringstream resultFile;
+    if (where == IN_DESTINATION) {
+      resultFile << dst_dir << "/" << relative_path;
+    } else {
+      resultFile << src_dir << "/vfs/" << relative_path;
+#ifndef WIN32
+      /* Workaround for mina-sshd setting the read file's permissions to 0000 */
+      REQUIRE(0 == chmod(resultFile.str().c_str(), 0644));
+#endif
+    }
+    std::ifstream file(resultFile.str());
+    REQUIRE(true == file.good());
+    std::stringstream content;
+    std::vector<char> buffer(1024U);
+    while (file) {
+      file.read(buffer.data(), buffer.size());
+      content << std::string(buffer.data(), file.gcount());
+    }
+    REQUIRE(expected_content == content.str());
+  }
+
+  void testFileNotExists(TestWhere where, const std::string& relative_path) {
+    std::stringstream resultFile;
+    if (where == IN_DESTINATION) {
+      resultFile << dst_dir << "/" << relative_path;
+    } else {
+      resultFile << src_dir << "/vfs/" << relative_path;
+#ifndef WIN32
+      /* Workaround for mina-sshd setting the read file's permissions to 0000 */
+      REQUIRE(-1 == chmod(resultFile.str().c_str(), 0644));
+#endif
+    }
+    std::ifstream file(resultFile.str());
+    REQUIRE(false == file.is_open());
+    REQUIRE(false == file.good());
+  }
+
+ protected:
+  char *src_dir;
+  char *dst_dir;
+  std::unique_ptr<SFTPTestServer> sftp_server;
+  TestController testController;
+  std::shared_ptr<TestPlan> plan;
+  std::shared_ptr<core::Processor> list_sftp;
+  std::shared_ptr<core::Processor> fetch_sftp;
+  std::shared_ptr<core::Processor> log_attribute;
+  std::shared_ptr<core::Processor> put_file;
+};
+
+TEST_CASE_METHOD(ListThenFetchSFTPTestsFixture, "ListSFTP then FetchSFTP one file", "[ListThenFetchSFTP][basic]") {
+  createFileWithModificationTimeDiff("nifi_test/tstFile.ext", "Test content 1");
+
+  testController.runSession(plan, true);
+
+  testFile(IN_SOURCE, "nifi_test/tstFile.ext", "Test content 1");
+  testFile(IN_DESTINATION, "nifi_test/tstFile.ext", "Test content 1");
+}
+
+TEST_CASE_METHOD(ListThenFetchSFTPTestsFixture, "ListSFTP then FetchSFTP two files", "[ListThenFetchSFTP][basic]") {
+  createFileWithModificationTimeDiff("nifi_test/file1.ext", "Test content 1");
+  createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2");
+
+  /* ListSFTP */
+  plan->runNextProcessor();
+
+  /* FetchSFTP */
+  plan->runNextProcessor();
+  plan->runCurrentProcessor();
+
+  /* LogAttribute */
+  plan->runNextProcessor();
+
+  /* PutFile */
+  plan->runNextProcessor();
+  plan->runCurrentProcessor();
+
+  testFile(IN_SOURCE, "nifi_test/file1.ext", "Test content 1");
+  testFile(IN_DESTINATION, "nifi_test/file1.ext", "Test content 1");
+  testFile(IN_SOURCE, "nifi_test/file2.ext", "Test content 2");
+  testFile(IN_DESTINATION, "nifi_test/file2.ext", "Test content 2");
+}
diff --git a/extensions/sftp/tests/PutSFTPTests.cpp b/extensions/sftp/tests/PutSFTPTests.cpp
index c59a7e6..b9189df 100644
--- a/extensions/sftp/tests/PutSFTPTests.cpp
+++ b/extensions/sftp/tests/PutSFTPTests.cpp
@@ -34,6 +34,9 @@
 #include <functional>
 #include <iterator>
 #include <random>
+#ifndef WIN32
+#include <unistd.h>
+#endif
 
 #include "TestBase.h"
 #include "utils/StringUtils.h"
@@ -523,7 +526,10 @@
 
 #ifndef WIN32
 TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP set uid and gid", "[PutSFTP]") {
-  std::cerr << "!!!! This test ONLY works as root, because it needs to chown !!!!" << std::endl;
+  if (getuid() != 0) {
+    std::cerr << "!!!! This test ONLY works as root, because it needs to chown. Exiting. !!!!" << std::endl;
+    return;
+  }
   plan->setProperty(put, "Remote Owner", "1234");
   plan->setProperty(put, "Remote Group", "4567");
 
diff --git a/extensions/sftp/tests/tools/SFTPTestServer.cpp b/extensions/sftp/tests/tools/SFTPTestServer.cpp
index 90efe2f..681b88f 100644
--- a/extensions/sftp/tests/tools/SFTPTestServer.cpp
+++ b/extensions/sftp/tests/tools/SFTPTestServer.cpp
@@ -78,16 +78,11 @@
   /* fork */
   pid_t pid = fork();
   if (pid == 0) {
-    /* Set the child process's pgid to its pid, so we will be able to kill the entire process group */
-    if (setpgid(0, 0) != 0) {
-      std::cerr << "Failed to set PGID, errno: " << strerror(errno) << std::endl;
-      exit(-1);
-    }
     /* execv */
     std::vector<char*> args(4U);
     args[0] = strdup("/bin/sh");
     args[1] = strdup("-c");
-    args[2] = strdup(("java -Djava.security.egd=file:/dev/./urandom -jar " + jar_path_ + " -w " + working_directory_ + " -k " + host_key_file_ + " &> " + server_log_file_path).c_str());
+    args[2] = strdup(("exec java -Djava.security.egd=file:/dev/./urandom -jar " + jar_path_ + " -w " + working_directory_ + " -k " + host_key_file_ + " >" + server_log_file_path + " 2>&1").c_str());
     args[3] = nullptr;
     execv("/bin/sh", args.data());
     std::cerr << "Failed to start server, errno: " << strerror(errno) << std::endl;
@@ -127,7 +122,7 @@
   throw std::runtime_error("Not implemented");
 #else
   if (server_pid_ != -1) {
-    if (::kill(-server_pid_, SIGTERM) != 0) {
+    if (::kill(server_pid_, SIGTERM) != 0) {
       logger_->log_error("Failed to kill child process, error: %s", strerror(errno));
       return false;
     }
diff --git a/libminifi/include/core/Property.h b/libminifi/include/core/Property.h
index 4f80689..2553c09 100644
--- a/libminifi/include/core/Property.h
+++ b/libminifi/include/core/Property.h
@@ -359,7 +359,7 @@
   }
 
   static bool StringToDateTime(const std::string& input, int64_t& output) {
-    int64_t temp = pareDateTimeStr(input);
+    int64_t temp = parseDateTimeStr(input);
     if (temp == -1) {
       return false;
     }
diff --git a/libminifi/include/utils/TimeUtil.h b/libminifi/include/utils/TimeUtil.h
index a20059a..437bd71 100644
--- a/libminifi/include/utils/TimeUtil.h
+++ b/libminifi/include/utils/TimeUtil.h
@@ -23,6 +23,8 @@
 #include <iomanip>
 #include <sstream>
 #include <chrono>
+#include <array>
+#include <limits>
 
 #define TIME_FORMAT "%Y-%m-%d %H:%M:%S"
 
@@ -67,7 +69,7 @@
  * @param str the datetime string
  * @returns Unix timestamp
  */
-inline int64_t pareDateTimeStr(const std::string &str) {
+inline int64_t parseDateTimeStr(const std::string &str) {
   /**
    * There is no strptime on Windows. As long as we have to parse a single date format this is not so bad,
    * but if multiple formats will have to be supported in the future, it might be worth it to include
@@ -123,4 +125,22 @@
   return time + timezone_offset;
 }
 
+inline bool getDateTimeStr(int64_t unix_timestamp, std::string& date_time_str) {
+  if (unix_timestamp > (std::numeric_limits<time_t>::max)() || unix_timestamp < (std::numeric_limits<time_t>::lowest)()) {
+    return false;
+  }
+  time_t time = static_cast<time_t>(unix_timestamp);
+  struct tm* gmt = gmtime(&time);
+  if (gmt == nullptr) {
+    return false;
+  }
+  std::array<char, 64U> buf;
+  if (strftime(buf.data(), buf.size(), "%Y-%m-%dT%H:%M:%SZ", gmt) == 0U) {
+    return false;
+  }
+
+  date_time_str = buf.data();
+  return true;
+}
+
 #endif
diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h
index 5a57f91..0b000ed 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -33,6 +33,7 @@
 #else
 #include <sys/stat.h>
 #include <sys/types.h>
+#include <utime.h>
 #include <dirent.h>
 #include <errno.h>
 #endif
@@ -50,6 +51,8 @@
 #include <tchar.h> // _tcscpy,_tcscat,_tcscmp
 #include <string> // string
 #include <algorithm> // replace
+#include <sys/types.h>
+#include <sys/utime.h> // _utime64
 #endif
 #ifdef __APPLE__
 #include <mach-o/dyld.h>
@@ -79,16 +82,19 @@
 
   FileUtils() = delete;
 
-  /**
-   * Deletes a directory, deleting recursively if delete_files_recursively is true
-   * @param path current path to delete
-   * @param delete_files_recursively deletes recursively
+  /*
+   * Get the platform-specific path separator.
+   * @param force_posix returns the posix path separator ('/'), even when not on posix. Useful when dealing with remote posix paths.
+   * @return the path separator character
    */
-
-  static char get_separator()
+  static char get_separator(bool force_posix = false)
   {
 #ifdef WIN32
-    return '\\';
+    if (force_posix) {
+      return '/';
+    } else {
+      return '\\';
+    }
 #else
     return '/';
 #endif
@@ -200,6 +206,20 @@
     return 0;
   }
 
+  static bool set_last_write_time(const std::string &path, uint64_t write_time) {
+#ifdef WIN32
+    struct __utimbuf64 utim;
+    utim.actime = write_time;
+    utim.modtime = write_time;
+    return _utime64(path.c_str(), &utim) == 0U;
+#else
+    struct utimbuf utim;
+    utim.actime = write_time;
+    utim.modtime = write_time;
+    return utime(path.c_str(), &utim) == 0U;
+#endif
+  }
+
 #ifndef WIN32
   static bool get_permissions(const std::string &path, uint32_t &permissions) {
     struct stat result;
@@ -416,76 +436,99 @@
 #endif
   }
 
-  static std::string concat_path(const std::string& root, const std::string& child) {
+  static std::string concat_path(const std::string& root, const std::string& child, bool force_posix = false) {
     if (root.empty()) {
       return child;
     }
     std::stringstream new_path;
-    if (root.back() == get_separator()) {
+    if (root.back() == get_separator(force_posix)) {
       new_path << root << child;
     } else {
-      new_path << root << get_separator() << child;
+      new_path << root << get_separator(force_posix) << child;
     }
     return new_path.str();
   }
 
-  static std::string get_parent_path(const std::string& path) {
+  static std::tuple<std::string /*parent_path*/, std::string /*child_path*/> split_path(const std::string& path, bool force_posix = false) {
     if (path.empty()) {
-      /* Empty path has no parent */
-      return "";
+      /* Empty path has no parent and no child*/
+      return std::make_tuple("", "");
     }
     bool absolute = false;
     size_t root_pos = 0U;
 #ifdef WIN32
-    if (path[0] == '\\') {
-      absolute = true;
-      if (path.size() < 2U) {
-        return "";
-      }
-      if (path[1] == '\\') {
-        if (path.size() >= 4U &&
-           (path[2] == '?' || path[2] == '.') &&
-            path[3] == '\\') {
-          /* Probably an UNC path */
-          root_pos = 4U;
-        } else {
-          /* Probably a \\server\-type path */
-          root_pos = 2U;
+    if (!force_posix) {
+      if (path[0] == '\\') {
+        absolute = true;
+        if (path.size() < 2U) {
+          return std::make_tuple("", "");
         }
-        root_pos = path.find_first_of("\\", root_pos);
-        if (root_pos == std::string::npos) {
-          return "";
+        if (path[1] == '\\') {
+          if (path.size() >= 4U &&
+             (path[2] == '?' || path[2] == '.') &&
+              path[3] == '\\') {
+            /* Probably an UNC path */
+            root_pos = 4U;
+          } else {
+            /* Probably a \\server\-type path */
+            root_pos = 2U;
+          }
+          root_pos = path.find_first_of("\\", root_pos);
+          if (root_pos == std::string::npos) {
+            return std::make_tuple("", "");
+          }
         }
+      } else if (path.size() >= 3U &&
+                 toupper(path[0]) >= 'A' &&
+                 toupper(path[0]) <= 'Z' &&
+                 path[1] == ':' &&
+                 path[2] == '\\') {
+        absolute = true;
+        root_pos = 2U;
       }
-    } else if (path.size() >= 3U &&
-               toupper(path[0]) >= 'A' &&
-               toupper(path[0]) <= 'Z' &&
-               path[1] == ':' &&
-               path[2] == '\\') {
-      absolute = true;
-      root_pos = 2U;
-    }
+    } else {
 #else
-    if (path[0] == '/') {
-      absolute = true;
-      root_pos = 0U;
-    }
+    if (true) {
 #endif
+      if (path[0] == '/') {
+        absolute = true;
+        root_pos = 0U;
+      }
+    }
+    /* Maybe we are just a single relative child */
+    if (!absolute && path.find(get_separator(force_posix)) == std::string::npos) {
+      return std::make_tuple("", path);
+    }
     /* Ignore trailing separators */
     size_t last_pos = path.size() - 1;
-    while (last_pos > root_pos && path[last_pos] == get_separator()) {
+    while (last_pos > root_pos && path[last_pos] == get_separator(force_posix)) {
       last_pos--;
     }
     if (absolute && last_pos == root_pos) {
       /* This means we are only a root */
-      return "";
+      return std::make_tuple("", "");
     }
-    /* Find parent */
-    size_t last_separator = path.find_last_of(get_separator(), last_pos);
+    /* Find parent-child separator */
+    size_t last_separator = path.find_last_of(get_separator(force_posix), last_pos);
     if (last_separator == std::string::npos || last_separator < root_pos) {
-      return "";
+      return std::make_tuple("", "");
     }
-    return path.substr(0, last_separator + 1);
+    std::string parent = path.substr(0, last_separator + 1);
+    std::string child = path.substr(last_separator + 1);
+
+    return std::make_tuple(std::move(parent), std::move(child));
+  }
+
+  static std::string get_parent_path(const std::string& path, bool force_posix = false) {
+    std::string parent_path;
+    std::tie(parent_path, std::ignore) = split_path(path, force_posix);
+    return parent_path;
+  }
+
+  static std::string get_child_path(const std::string& path, bool force_posix = false) {
+    std::string child_path;
+    std::tie(std::ignore, child_path) = split_path(path, force_posix);
+    return child_path;
   }
 
   /*
diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp
index 73dc77a..2ef6f0b 100644
--- a/libminifi/src/core/ConfigurableComponent.cpp
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -70,7 +70,7 @@
   auto &&it = properties_.find(name);
 
   if (it != properties_.end()) {
-    Property &orig_property = it->second;
+    Property orig_property = it->second;
     Property new_property = orig_property;
     new_property.setValue(value);
     properties_[new_property.getName()] = new_property;
@@ -102,7 +102,7 @@
   auto &&it = properties_.find(name);
 
   if (it != properties_.end()) {
-    Property &orig_property = it->second;
+    Property orig_property = it->second;
     Property new_property = orig_property;
     new_property.addValue(value);
     properties_[new_property.getName()] = new_property;
@@ -125,7 +125,7 @@
   auto it = properties_.find(prop.getName());
 
   if (it != properties_.end()) {
-    Property &orig_property = it->second;
+    Property orig_property = it->second;
     Property new_property = orig_property;
     new_property.setValue(value);
     properties_[new_property.getName()] = new_property;
@@ -153,7 +153,7 @@
   auto it = properties_.find(prop.getName());
 
   if (it != properties_.end()) {
-    Property &orig_property = it->second;
+    Property orig_property = it->second;
     Property new_property = orig_property;
     new_property.setValue(value);
     properties_[new_property.getName()] = new_property;
@@ -230,7 +230,7 @@
   auto &&it = dynamic_properties_.find(name);
 
   if (it != dynamic_properties_.end()) {
-    Property &orig_property = it->second;
+    Property orig_property = it->second;
     Property new_property = orig_property;
     new_property.setValue(value);
     new_property.setSupportsExpressionLanguage(true);
@@ -248,7 +248,7 @@
   auto &&it = dynamic_properties_.find(name);
 
   if (it != dynamic_properties_.end()) {
-    Property &orig_property = it->second;
+    Property orig_property = it->second;
     Property new_property = orig_property;
     new_property.addValue(value);
     new_property.setSupportsExpressionLanguage(true);
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 43eead4..f496b3f 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -96,6 +96,23 @@
   return processor;
 }
 
+std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &processor_name, utils::Identifier& uuid, const std::string &name, const std::initializer_list<core::Relationship>& relationships, bool linkToPrevious) {
+  if (finalized) {
+    return nullptr;
+  }
+  std::lock_guard<std::recursive_mutex> guard(mutex);
+
+  auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
+  if (nullptr == ptr) {
+    throw std::exception();
+  }
+  std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
+
+  processor->setName(name);
+
+  return addProcessor(processor, name, relationships, linkToPrevious);
+}
+
 std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &processor_name, const std::string &name, const std::initializer_list<core::Relationship>& relationships, bool linkToPrevious) {
   if (finalized) {
     return nullptr;
@@ -108,15 +125,7 @@
 
   std::cout << "generated " << uuid.to_string() << std::endl;
 
-  auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid);
-  if (nullptr == ptr) {
-    throw std::exception();
-  }
-  std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr);
-
-  processor->setName(name);
-
-  return addProcessor(processor, name, relationships, linkToPrevious);
+  return addProcessor(processor_name, uuid, name, relationships, linkToPrevious);
 }
 
 bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value, bool dynamic) {
@@ -169,6 +178,7 @@
   }
   std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context);
   process_sessions_.push_back(current_session);
+  current_flowfile_ = nullptr;
   processor->incrementActiveTasks();
   processor->setScheduledState(core::ScheduledState::RUNNING);
   if (verify != nullptr) {
@@ -178,7 +188,30 @@
     processor->onTrigger(context, current_session);
   }
   current_session->commit();
-  current_flowfile_ = current_session->get();
+  return location + 1 < processor_queue_.size();
+}
+
+bool TestPlan::runCurrentProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify) {
+  if (!finalized) {
+    finalize();
+  }
+  logger_->log_info("Rerunning current processor %d, processor_queue_.size %d, processor_contexts_.size %d", location, processor_queue_.size(), processor_contexts_.size());
+  std::lock_guard<std::recursive_mutex> guard(mutex);
+
+  std::shared_ptr<core::Processor> processor = processor_queue_.at(location);
+  std::shared_ptr<core::ProcessContext> context = processor_contexts_.at(location);
+  std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context);
+  process_sessions_.push_back(current_session);
+  current_flowfile_ = nullptr;
+  processor->incrementActiveTasks();
+  processor->setScheduledState(core::ScheduledState::RUNNING);
+  if (verify != nullptr) {
+    verify(context, current_session);
+  } else {
+    logger_->log_info("Running %s", processor->getName());
+    processor->onTrigger(context, current_session);
+  }
+  current_session->commit();
   return location + 1 < processor_queue_.size();
 }
 
@@ -187,6 +220,9 @@
 }
 
 std::shared_ptr<core::FlowFile> TestPlan::getCurrentFlowFile() {
+  if (current_flowfile_ == nullptr) {
+    current_flowfile_ = process_sessions_.at(location)->get();
+  }
   return current_flowfile_;
 }
 
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 33bf40e..21a2f43 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -187,12 +187,17 @@
   std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, const std::initializer_list<core::Relationship>& relationships,
                                                 bool linkToPrevious = false);
 
+  std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, utils::Identifier& uuid, const std::string &name, const std::initializer_list<core::Relationship>& relationships,
+                                                bool linkToPrevious = false);
+
   bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value, bool dynamic = false);
 
   void reset();
 
   bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
 
+  bool runCurrentProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
+
   std::set<std::shared_ptr<provenance::ProvenanceEventRecord>> getProvenanceRecords();
 
   std::shared_ptr<core::FlowFile> getCurrentFlowFile();
@@ -232,7 +237,6 @@
 
   int location;
 
-  std::shared_ptr<core::ProcessSession> current_session_;
   std::shared_ptr<core::FlowFile> current_flowfile_;
 
   std::shared_ptr<minifi::state::response::FlowVersion> flow_version_;
diff --git a/libminifi/test/unit/FileUtilsTests.cpp b/libminifi/test/unit/FileUtilsTests.cpp
index 8b3d3ae..85c783b 100644
--- a/libminifi/test/unit/FileUtilsTests.cpp
+++ b/libminifi/test/unit/FileUtilsTests.cpp
@@ -59,6 +59,28 @@
 #endif
 }
 
+TEST_CASE("TestFileUtils::get_child_path", "[TestGetChildPath]") {
+#ifdef WIN32
+  REQUIRE("bar" == FileUtils::get_child_path("foo\\bar"));
+  REQUIRE("bar\\" == FileUtils::get_child_path("foo\\bar\\"));
+  REQUIRE("bar" == FileUtils::get_child_path("C:\\foo\\bar"));
+  REQUIRE("bar\\" == FileUtils::get_child_path("C:\\foo\\bar\\"));
+  REQUIRE("foo" == FileUtils::get_child_path("C:\\foo"));
+  REQUIRE("foo\\" == FileUtils::get_child_path("C:\\foo\\"));
+  REQUIRE("" == FileUtils::get_child_path("C:\\"));
+  REQUIRE("" == FileUtils::get_child_path("C:\\\\"));
+#else
+  REQUIRE("bar" == FileUtils::get_child_path("foo/bar"));
+  REQUIRE("bar/" == FileUtils::get_child_path("foo/bar/"));
+  REQUIRE("bar" == FileUtils::get_child_path("/foo/bar"));
+  REQUIRE("bar/" == FileUtils::get_child_path("/foo/bar/"));
+  REQUIRE("foo" == FileUtils::get_child_path("/foo"));
+  REQUIRE("foo/" == FileUtils::get_child_path("/foo/"));
+  REQUIRE("" == FileUtils::get_child_path("/"));
+  REQUIRE("" == FileUtils::get_child_path("//"));
+#endif
+}
+
 TEST_CASE("TestFileUtils::get_executable_path", "[TestGetExecutablePath]") {
   std::string executable_path = FileUtils::get_executable_path();
   std::cerr << "Executable path: " << executable_path << std::endl;