MINIFICPP-1228 - Refactor ResourceClaim reference counting

MINIFICPP-1228 - Processors can own FlowFiles.

MINIFICPP-1228 - From now on it isn't the Connection's responsibility to persist the FlowFiles.

MINIFICPP-1228 - BinFiles properly restores files.

Signed-off-by: Arpad Boda <aboda@apache.org>

This closes #807
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3479070..e38993f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -768,6 +768,10 @@
 
 registerTest("${TEST_DIR}/flow-tests")
 
+if (NOT DISABLE_ROCKSDB AND NOT DISABLE_LIBARCHIVE)
+	registerTest("${TEST_DIR}/persistence-tests")
+endif()
+
 include(BuildDocs)
 
 include(DockerConfig)
diff --git a/extensions/libarchive/BinFiles.cpp b/extensions/libarchive/BinFiles.cpp
index bbb32cc..a7881ba 100644
--- a/extensions/libarchive/BinFiles.cpp
+++ b/extensions/libarchive/BinFiles.cpp
@@ -46,6 +46,7 @@
 core::Property BinFiles::MaxBinCount("Maximum number of Bins", "Specifies the maximum number of bins that can be held in memory at any one time", "100");
 core::Relationship BinFiles::Original("original", "The FlowFiles that were used to create the bundle");
 core::Relationship BinFiles::Failure("failure", "If the bundle cannot be created, all FlowFiles that would have been used to create the bundle will be transferred to failure");
+core::Relationship BinFiles::Self("__self__", "Marks the FlowFile to be owned by this processor");
 const char *BinFiles::FRAGMENT_COUNT_ATTRIBUTE = "fragment.count";
 const char *BinFiles::FRAGMENT_ID_ATTRIBUTE = "fragment.identifier";
 const char *BinFiles::FRAGMENT_INDEX_ATTRIBUTE = "fragment.index";
@@ -153,7 +154,7 @@
 void BinManager::removeOldestBin() {
   std::lock_guard < std::mutex > lock(mutex_);
   uint64_t olddate = ULLONG_MAX;
-  std::unique_ptr < std::deque<std::unique_ptr<Bin>>>*oldqueue;
+  std::unique_ptr < std::deque<std::unique_ptr<Bin>>>* oldqueue;
   for (std::map<std::string, std::unique_ptr<std::deque<std::unique_ptr<Bin>>>>::iterator it=groupBinMap_.begin(); it !=groupBinMap_.end(); ++it) {
     std::unique_ptr < std::deque<std::unique_ptr<Bin>>>&queue = it->second;
     if (!queue->empty()) {
@@ -235,6 +236,28 @@
 }
 
 void BinFiles::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
+  // Rollback is not viable for this processor!!
+  {
+    // process resurrected FlowFiles first
+    auto flowFiles = file_store_.getNewFlowFiles();
+    // these are already processed FlowFiles, that we own
+    bool hadFailure = false;
+    for (auto &file : flowFiles) {
+      std::string groupId = getGroupId(context.get(), file);
+      bool offer = this->binManager_.offer(groupId, file);
+      if (!offer) {
+        session->transfer(file, Failure);
+        hadFailure = true;
+      } else {
+        // no need to route successfully captured such files as we already own them
+      }
+    }
+    if (hadFailure) {
+      context->yield();
+      return;
+    }
+  }
+
   std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast < FlowFileRecord > (session->get());
 
   if (flow != nullptr) {
@@ -247,9 +270,8 @@
       context->yield();
       return;
     }
-
-    // remove the flowfile from the process session, it add to merge session later.
-    session->remove(flow);
+    // assuming ownership over the incoming flowFile
+    session->transfer(flow, Self);
   }
 
   // migrate bin to ready bin
@@ -266,18 +288,19 @@
   binManager_.getReadyBin(readyBins);
 
   // process the ready bin
-  if (!readyBins.empty()) {
+  while (!readyBins.empty()) {
     // create session for merge
+    // we have to create a new session
+    // for each merge as a rollback erases all
+    // previously added files
     core::ProcessSession mergeSession(context);
-    while (!readyBins.empty()) {
-      std::unique_ptr<Bin> bin = std::move(readyBins.front());
-      readyBins.pop_front();
-      // add bin's flows to the session
-      this->addFlowsToSession(context.get(), &mergeSession, bin);
-      logger_->log_debug("BinFiles start to process bin %s for group %s", bin->getUUIDStr(), bin->getGroupId());
-      if (!this->processBin(context.get(), &mergeSession, bin))
-          this->transferFlowsToFail(context.get(), &mergeSession, bin);
-    }
+    std::unique_ptr<Bin> bin = std::move(readyBins.front());
+    readyBins.pop_front();
+    // add bin's flows to the session
+    this->addFlowsToSession(context.get(), &mergeSession, bin);
+    logger_->log_debug("BinFiles start to process bin %s for group %s", bin->getUUIDStr(), bin->getGroupId());
+    if (!this->processBin(context.get(), &mergeSession, bin))
+      this->transferFlowsToFail(context.get(), &mergeSession, bin);
     mergeSession.commit();
   }
 }
@@ -297,6 +320,43 @@
   }
 }
 
+void BinFiles::put(std::shared_ptr<core::Connectable> flow) {
+  auto flowFile = std::dynamic_pointer_cast<core::FlowFile>(flow);
+  if (!flowFile) return;
+  if (flowFile->getOriginalConnection()) {
+    // onTrigger assumed ownership over a FlowFile
+    // don't have to do anything
+    return;
+  }
+  // no original connection i.e. we are during restore
+  file_store_.put(flowFile);
+}
+
+void BinFiles::FlowFileStore::put(std::shared_ptr<core::FlowFile>& flowFile) {
+  {
+    std::lock_guard<std::mutex> guard(flow_file_mutex_);
+    incoming_files_.emplace(std::move(flowFile));
+  }
+  has_new_flow_file_.store(true, std::memory_order_release);
+}
+
+std::unordered_set<std::shared_ptr<core::FlowFile>> BinFiles::FlowFileStore::getNewFlowFiles() {
+  bool hasNewFlowFiles = true;
+  if (!has_new_flow_file_.compare_exchange_strong(hasNewFlowFiles, false, std::memory_order_acquire, std::memory_order_relaxed)) {
+    return {};
+  }
+  std::lock_guard<std::mutex> guard(flow_file_mutex_);
+  return std::move(incoming_files_);
+}
+
+std::set<std::shared_ptr<core::Connectable>> BinFiles::getOutGoingConnections(const std::string &relationship) const {
+  auto result = core::Connectable::getOutGoingConnections(relationship);
+  if (relationship == Self.getName()) {
+    result.insert(std::static_pointer_cast<core::Connectable>(std::const_pointer_cast<core::Processor>(shared_from_this())));
+  }
+  return result;
+}
+
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h
index 098ac0e..f8b89ea 100644
--- a/extensions/libarchive/BinFiles.h
+++ b/extensions/libarchive/BinFiles.h
@@ -194,7 +194,7 @@
   bool offer(const std::string &group, std::shared_ptr<core::FlowFile> flow);
   // gather ready bins once the bin are full enough or exceed bin age
   void gatherReadyBins();
-  // remove oldest bin
+  // marks oldest bin as ready
   void removeOldestBin();
   // get ready bin from binManager
   void getReadyBin(std::deque<std::unique_ptr<Bin>> &retBins);
@@ -218,6 +218,8 @@
 
 // BinFiles Class
 class BinFiles : public core::Processor {
+ protected:
+  static core::Relationship Self;
  public:
   // Constructor
   /*!
@@ -262,14 +264,18 @@
    * @param sessionFactory process session factory that is used when creating
    * ProcessSession objects.
    */
-  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
   // OnTrigger method, implemented by NiFi BinFiles
-  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
+  void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override {
   }
   // OnTrigger method, implemented by NiFi BinFiles
-  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
   // Initialize, over write by NiFi BinFiles
-  virtual void initialize(void);
+  void initialize(void) override;
+
+  void put(std::shared_ptr<core::Connectable> flow) override;
+
+  std::set<std::shared_ptr<core::Connectable>> getOutGoingConnections(const std::string &relationship) const override;
 
  protected:
   // Allows general pre-processing of a flow file before it is offered to a bin. This is called before getGroupId().
@@ -284,14 +290,29 @@
   }
   // transfer flows to failure in bin
   void transferFlowsToFail(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin);
-  // add flows to session
+  // moves owned flows to session
   void addFlowsToSession(core::ProcessContext *context, core::ProcessSession *session, std::unique_ptr<Bin> &bin);
 
   BinManager binManager_;
 
  private:
+  class FlowFileStore{
+   public:
+    /**
+     * Returns the already-preprocessed FlowFiles that got restored on restart from the FlowFileRepository
+     * @return the resurrected persisted FlowFiles
+     */
+    std::unordered_set<std::shared_ptr<core::FlowFile>> getNewFlowFiles();
+    void put(std::shared_ptr<core::FlowFile>& flowFile);
+   private:
+    std::atomic_bool has_new_flow_file_{false};
+    std::mutex flow_file_mutex_;
+    std::unordered_set<std::shared_ptr<core::FlowFile>> incoming_files_;
+  };
+
   std::shared_ptr<logging::Logger> logger_;
   int maxBinCount_;
+  FlowFileStore file_store_;
 };
 
 REGISTER_RESOURCE(BinFiles, "Bins flow files into buckets based on the number of entries or size of entries");
diff --git a/extensions/libarchive/CompressContent.cpp b/extensions/libarchive/CompressContent.cpp
index 3512939..c6bd9df 100644
--- a/extensions/libarchive/CompressContent.cpp
+++ b/extensions/libarchive/CompressContent.cpp
@@ -110,6 +110,8 @@
     return;
   }
 
+  session->remove(flowFile);
+
   std::string compressFormat = compressFormat_;
   if (compressFormat_ == COMPRESSION_FORMAT_ATTRIBUTE) {
     std::string attr;
@@ -202,7 +204,6 @@
     }
     logger_->log_debug("Compress Content processing success for the flow with UUID %s name %s", processFlowFile->getUUIDStr(), fileName);
     session->transfer(processFlowFile, Success);
-    session->remove(flowFile);
   }
 }
 
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp
index 127ce9b..5184c7c 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -81,10 +81,10 @@
     return;  // Stop here - don't delete from content repo while we have records in FF repo
   }
 
-  if (nullptr != content_repo_) {
+  if (content_repo_) {
     for (const auto &ffr : purgeList) {
       auto claim = ffr->getResourceClaim();
-      if (claim != nullptr) {
+      if (claim) {
         content_repo_->removeIfOrphaned(claim);
       }
     }
@@ -148,22 +148,27 @@
     std::string key = it->key().ToString();
     if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) {
       logger_->log_debug("Found connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
-      auto search = connectionMap.find(eventRead->getConnectionUuid());
-      if (!corrupt_checkpoint && search != connectionMap.end()) {
+      bool found = false;
+      auto search = containers.find(eventRead->getConnectionUuid());
+      found = (search != containers.end());
+      if (!found) {
+        // for backward compatibility
+        search = connectionMap.find(eventRead->getConnectionUuid());
+        found = (search != connectionMap.end());
+      }
+      if (!corrupt_checkpoint && found) {
         // we find the connection for the persistent flowfile, create the flowfile and enqueue that
         std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead);
         eventRead->setStoredToRepository(true);
         search->second->put(eventRead);
       } else {
         logger_->log_warn("Could not find connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
-        if (eventRead->getContentFullPath().length() > 0) {
-          if (nullptr != eventRead->getResourceClaim()) {
-            content_repo_->remove(eventRead->getResourceClaim());
-          }
-        }
+        auto claim = eventRead->getResourceClaim();
+        if (claim) claim->decreaseFlowFileRecordOwnedCount();
         keys_to_delete.enqueue(key);
       }
     } else {
+      // failed to deserialize FlowFile, cannot clear claim
       keys_to_delete.enqueue(key);
     }
   }
diff --git a/extensions/standard-processors/processors/PutFile.cpp b/extensions/standard-processors/processors/PutFile.cpp
index 0d89355..43ace43 100644
--- a/extensions/standard-processors/processors/PutFile.cpp
+++ b/extensions/standard-processors/processors/PutFile.cpp
@@ -103,6 +103,8 @@
     return;
   }
 
+  session->remove(flowFile);
+
   std::string directory;
 
   if (!context->getProperty(Directory, directory, flowFile)) {
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index 8969310..6451fc9 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -154,7 +154,7 @@
     return queued_data_size_;
   }
   void put(std::shared_ptr<core::Connectable> flow) override {
-    std::shared_ptr<core::FlowFile> ff = std::static_pointer_cast<core::FlowFile>(flow);
+    std::shared_ptr<core::FlowFile> ff = std::dynamic_pointer_cast<core::FlowFile>(flow);
     if (nullptr != ff) {
       put(ff);
     }
diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h
index 666b6d2..1fa4c5d 100644
--- a/libminifi/include/FlowFileRecord.h
+++ b/libminifi/include/FlowFileRecord.h
@@ -160,7 +160,7 @@
   }
 
   const std::string getContentFullPath() {
-    return content_full_fath_;
+    return claim_ ? claim_->getContentFullPath() : "";
   }
 
   /**
@@ -175,8 +175,6 @@
  protected:
   // connection uuid
   std::string uuid_connection_;
-  // Full path to the content
-  std::string content_full_fath_;
 
   // Local flow sequence ID
   static std::atomic<uint64_t> local_flow_seq_number_;
diff --git a/libminifi/include/core/Connectable.h b/libminifi/include/core/Connectable.h
index 212e651..badea9f 100644
--- a/libminifi/include/core/Connectable.h
+++ b/libminifi/include/core/Connectable.h
@@ -77,7 +77,7 @@
    * Get outgoing connection based on relationship
    * @return set of outgoing connections.
    */
-  std::set<std::shared_ptr<Connectable>> getOutGoingConnections(const std::string &relationship) const;
+  virtual std::set<std::shared_ptr<Connectable>> getOutGoingConnections(const std::string &relationship) const;
 
   virtual void put(std::shared_ptr<Connectable> flow) {
   }
diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h
index 563a8e7..ce64873 100644
--- a/libminifi/include/core/FlowFile.h
+++ b/libminifi/include/core/FlowFile.h
@@ -22,6 +22,7 @@
 #include <memory>
 #include <set>
 #include <string>
+#include <utility>
 
 #include "utils/TimeUtil.h"
 #include "ResourceClaim.h"
@@ -35,9 +36,67 @@
 namespace core {
 
 class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+  class FlowFileOwnedResourceClaimPtr{
+   public:
+    FlowFileOwnedResourceClaimPtr() = default;
+    explicit FlowFileOwnedResourceClaimPtr(const std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&& claim) : claim_(std::move(claim)) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : claim_(ref.claim_) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(FlowFileOwnedResourceClaimPtr&& ref) : claim_(std::move(ref.claim_)) {
+      // taking ownership of claim, no need to increment/decrement
+    }
+    FlowFileOwnedResourceClaimPtr& operator=(const FlowFileOwnedResourceClaimPtr& ref) = delete;
+    FlowFileOwnedResourceClaimPtr& operator=(FlowFileOwnedResourceClaimPtr&& ref) = delete;
+
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const FlowFileOwnedResourceClaimPtr& ref) {
+      return set(owner, ref.claim_);
+    }
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const std::shared_ptr<ResourceClaim>& newClaim) {
+      auto oldClaim = claim_;
+      claim_ = newClaim;
+      // the order of increase/release is important
+      // with refcount manipulation we should always increment first, then decrement as this way we don't accidentally
+      // discard the object under ourselves, note that an equality check will not suffice as two ResourceClaim
+      // instances can reference the same file (they could have the same contentPath)
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+      if (oldClaim) owner.releaseClaim(oldClaim);
+      return *this;
+    }
+    const std::shared_ptr<ResourceClaim>& get() const {
+      return claim_;
+    }
+    const std::shared_ptr<ResourceClaim>& operator->() const {
+      return claim_;
+    }
+    operator bool() const noexcept {
+      return static_cast<bool>(claim_);
+    }
+    ~FlowFileOwnedResourceClaimPtr() {
+      // allow the owner FlowFile to manually release the claim
+      // while logging stuff and removing it from repositories
+      assert(!claim_);
+    }
+
+   private:
+    /*
+     * We are aiming for the constraint that all FlowFiles should have a non-null claim pointer,
+     * unfortunately, for now, some places (e.g. ProcessSession::create) violate this constraint.
+     * We should indicate an empty or invalid content with special claims like
+     * InvalidResourceClaim and EmptyResourceClaim.
+     */
+    std::shared_ptr<ResourceClaim> claim_;
+  };
+
  public:
   FlowFile();
-  ~FlowFile();
+  ~FlowFile() override;
   FlowFile& operator=(const FlowFile& other);
 
   /**
@@ -48,7 +107,7 @@
   /**
    * Sets _claim to the inbound claim argument
    */
-  void setResourceClaim(std::shared_ptr<ResourceClaim> &claim);
+  void setResourceClaim(const std::shared_ptr<ResourceClaim>& claim);
 
   /**
    * clear the resource claim
@@ -59,22 +118,22 @@
    * Returns a pointer to this flow file record's
    * claim at the given stash key
    */
-  std::shared_ptr<ResourceClaim> getStashClaim(const std::string &key);
+  std::shared_ptr<ResourceClaim> getStashClaim(const std::string& key);
 
   /**
    * Sets the given stash key to the inbound claim argument
    */
-  void setStashClaim(const std::string &key, const std::shared_ptr<ResourceClaim> &claim);
+  void setStashClaim(const std::string& key, const std::shared_ptr<ResourceClaim>& claim);
 
   /**
    * Clear the resource claim at the given stash key
    */
-  void clearStashClaim(const std::string &key);
+  void clearStashClaim(const std::string& key);
 
   /**
    * Return true if the given stash claim exists
    */
-  bool hasStashClaim(const std::string &key);
+  bool hasStashClaim(const std::string& key);
 
   /**
    * Decrease the flow file record owned count for the resource claim and, if 
@@ -85,7 +144,7 @@
   /**
    * Get lineage identifiers
    */
-  std::set<std::string> &getlineageIdentifiers();
+  std::set<std::string>& getlineageIdentifiers();
 
   /**
    * Returns whether or not this flow file record
@@ -134,7 +193,7 @@
    * @param value value to set
    * @return result of finding key
    */
-  bool getAttribute(std::string key, std::string &value) const;
+  bool getAttribute(std::string key, std::string& value) const;
 
   /**
    * Updates the value in the attribute map that corresponds
@@ -155,7 +214,7 @@
   /**
    * setAttribute, if attribute already there, update it, else, add it
    */
-  void setAttribute(const std::string &key, const std::string &value) {
+  void setAttribute(const std::string& key, const std::string& value) {
     attributes_[key] = value;
   }
 
@@ -179,7 +238,7 @@
    * adds an attribute if it does not exist
    *
    */
-  bool addAttribute(const std::string &key, const std::string &value);
+  bool addAttribute(const std::string& key, const std::string& value);
 
   /**
    * Set the size of this record.
@@ -220,7 +279,7 @@
    */
   uint64_t getOffset() const;
 
-  bool getUUID(utils::Identifier &other) {
+  bool getUUID(utils::Identifier& other) {
     other = uuid_;
     return true;
   }
@@ -237,12 +296,12 @@
   /**
    * Yield
    */
-  virtual void yield() {
+  void yield() override {
   }
   /**
    * Determines if we are connected and operating
    */
-  virtual bool isRunning() {
+  bool isRunning() override {
     return true;
   }
 
@@ -250,7 +309,7 @@
    * Determines if work is available by this connectable
    * @return boolean if work is available.
    */
-  virtual bool isWorkAvailable() {
+  bool isWorkAvailable() override {
     return true;
   }
 
@@ -258,13 +317,13 @@
    * Sets the original connection with a shared pointer.
    * @param connection shared connection.
    */
-  void setConnection(std::shared_ptr<core::Connectable> &connection);
+  void setConnection(std::shared_ptr<core::Connectable>& connection);
 
   /**
    * Sets the original connection with a shared pointer.
    * @param connection shared connection.
    */
-  void setConnection(std::shared_ptr<core::Connectable> &&connection);
+  void setConnection(std::shared_ptr<core::Connectable>&& connection);
 
   /**
    * Returns the connection referenced by this record.
@@ -275,7 +334,7 @@
    * Sets the original connection with a shared pointer.
    * @param connection shared connection.
    */
-  void setOriginalConnection(std::shared_ptr<core::Connectable> &connection);
+  void setOriginalConnection(std::shared_ptr<core::Connectable>& connection);
   /**
    * Returns the original connection referenced by this record.
    * @return shared original connection pointer.
@@ -314,9 +373,9 @@
   // Attributes key/values pairs for the flow record
   std::map<std::string, std::string> attributes_;
   // Pointer to the associated content resource claim
-  std::shared_ptr<ResourceClaim> claim_;
+  FlowFileOwnedResourceClaimPtr claim_;
   // Pointers to stashed content resource claims
-  std::map<std::string, std::shared_ptr<ResourceClaim>> stashedContent_;
+  std::map<std::string, FlowFileOwnedResourceClaimPtr> stashedContent_;
   // UUID string
   // std::string uuid_str_;
   // UUID string for all parents
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index 370426a..1f7a90a 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -228,6 +228,8 @@
 
   void getConnections(std::map<std::string, std::shared_ptr<Connectable>> &connectionMap);
 
+  void getFlowFileContainers(std::map<std::string, std::shared_ptr<Connectable>> &containers) const;
+
   void drainConnections();
 
   std::size_t getTotalFlowFileCount() const;
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index b98eb85..e6e4f23 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -59,10 +59,10 @@
     provenance_report_ = std::make_shared<provenance::ProvenanceReporter>(repo, process_context_->getProcessorNode()->getName(), process_context_->getProcessorNode()->getName());
   }
 
-// Destructor
+  // Destructor
   virtual ~ProcessSession();
 
-// Commit the session
+  // Commit the session
   void commit();
   // Roll Back the session
   void rollback();
@@ -70,7 +70,6 @@
   std::shared_ptr<provenance::ProvenanceReporter> getProvenanceReporter() {
     return provenance_report_;
   }
-  //
   // Get the FlowFile from the highest priority queue
   virtual std::shared_ptr<core::FlowFile> get();
   // Create a new UUID FlowFile with no content resource claim and without parent
@@ -81,7 +80,7 @@
   std::shared_ptr<core::FlowFile> create(const std::shared_ptr<core::FlowFile> &parent);
   // Add a FlowFile to the session
   virtual void add(const std::shared_ptr<core::FlowFile> &flow);
-// Clone a new UUID FlowFile from parent both for content resource claim and attributes
+  // Clone a new UUID FlowFile from parent both for content resource claim and attributes
   std::shared_ptr<core::FlowFile> clone(const std::shared_ptr<core::FlowFile> &parent);
   // Clone a new UUID FlowFile from parent for attributes and sub set of parent content resource claim
   std::shared_ptr<core::FlowFile> clone(const std::shared_ptr<core::FlowFile> &parent, int64_t offset, int64_t size);
@@ -140,21 +139,24 @@
   ProcessSession &operator=(const ProcessSession &parent) = delete;
 
  protected:
-// FlowFiles being modified by current process session
-  std::map<std::string, std::shared_ptr<core::FlowFile> > _updatedFlowFiles;
+  // FlowFiles being modified by current process session
+  std::map<std::string, std::shared_ptr<core::FlowFile>> _updatedFlowFiles;
   // Copy of the original FlowFiles being modified by current process session as above
-  std::map<std::string, std::shared_ptr<core::FlowFile> > _originalFlowFiles;
+  std::map<std::string, std::shared_ptr<core::FlowFile>> _flowFileSnapShots;
   // FlowFiles being added by current process session
-  std::map<std::string, std::shared_ptr<core::FlowFile> > _addedFlowFiles;
+  std::map<std::string, std::shared_ptr<core::FlowFile>> _addedFlowFiles;
   // FlowFiles being deleted by current process session
-  std::map<std::string, std::shared_ptr<core::FlowFile> > _deletedFlowFiles;
+  std::map<std::string, std::shared_ptr<core::FlowFile>> _deletedFlowFiles;
   // FlowFiles being transfered to the relationship
   std::map<std::string, Relationship> _transferRelationship;
   // FlowFiles being cloned for multiple connections per relationship
-  std::map<std::string, std::shared_ptr<core::FlowFile> > _clonedFlowFiles;
+  std::map<std::string, std::shared_ptr<core::FlowFile>> _clonedFlowFiles;
 
  private:
-// Clone the flow file during transfer to multiple connections for a relationship
+  void persistFlowFilesBeforeTransfer(
+      std::map<std::shared_ptr<Connectable>, std::vector<std::shared_ptr<core::FlowFile>>>& transactionMap,
+      const std::map<std::string, std::shared_ptr<FlowFile>>& originalFlowFileSnapShots);
+  // Clone the flow file during transfer to multiple connections for a relationship
   std::shared_ptr<core::FlowFile> cloneDuringTransfer(std::shared_ptr<core::FlowFile> &parent);
   // ProcessContext
   std::shared_ptr<ProcessContext> process_context_;
diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h
index dea7212..78ec5ec 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -215,8 +215,6 @@
   // Whether flow file queue full in any of the outgoin connection
   bool flowFilesOutGoingFull();
 
-  // Get outgoing connections based on relationship name
-  std::set<std::shared_ptr<Connection> > getOutGoingConnections(std::string relationship);
   // Add connection
   bool addConnection(std::shared_ptr<Connectable> connection);
   // Remove connection
diff --git a/libminifi/include/core/ProcessorNode.h b/libminifi/include/core/ProcessorNode.h
index 671b468..b52405c 100644
--- a/libminifi/include/core/ProcessorNode.h
+++ b/libminifi/include/core/ProcessorNode.h
@@ -54,7 +54,7 @@
     return processor_;
   }
 
-  void yield() {
+  void yield() override {
     processor_->yield();
   }
 
@@ -107,7 +107,7 @@
    * Returns theflow version
    * @returns flow version. can be null if a flow version is not tracked.
    */
-  virtual std::shared_ptr<state::FlowIdentifier> getFlowIdentifier() const {
+  std::shared_ptr<state::FlowIdentifier> getFlowIdentifier() const override {
     if (processor_ != nullptr) {
       return processor_->getFlowIdentifier();
     } else {
@@ -224,7 +224,7 @@
    * Get outgoing connection based on relationship
    * @return set of outgoing connections.
    */
-  std::set<std::shared_ptr<Connectable>> getOutGoingConnections(std::string relationship) {
+  std::set<std::shared_ptr<Connectable>> getOutGoingConnections(const std::string& relationship) const override {
     return processor_->getOutGoingConnections(relationship);
   }
 
@@ -236,7 +236,7 @@
     return processor_->getNextIncomingConnection();
   }
 
-  std::shared_ptr<Connectable> pickIncomingConnection() {
+  std::shared_ptr<Connectable> pickIncomingConnection() override {
     return processor_->pickIncomingConnection();
   }
 
@@ -269,7 +269,7 @@
   }
 
 // Get Process Name
-  std::string getName() const {
+  std::string getName() const override {
     return processor_->getName();
   }
 
@@ -281,18 +281,18 @@
     processor_->setMaxConcurrentTasks(tasks);
   }
 
-  virtual bool supportsDynamicProperties() {
+  bool supportsDynamicProperties() override {
     return false;
   }
 
-  virtual bool isRunning();
+  bool isRunning() override;
 
-  virtual bool isWorkAvailable();
+  bool isWorkAvailable() override;
 
   virtual ~ProcessorNode();
 
  protected:
-  virtual bool canEdit() {
+  bool canEdit() override {
     return !processor_->isRunning();
   }
 
diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h
index e44c479..0741323 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -117,6 +117,10 @@
     this->connectionMap = connectionMap;
   }
 
+  void setContainers(std::map<std::string, std::shared_ptr<core::Connectable>> &containers) {
+    this->containers = containers;
+  }
+
   virtual bool Get(const std::string &key, std::string &value) {
     return false;
   }
@@ -228,6 +232,8 @@
   Repository &operator=(const Repository &parent) = delete;
 
  protected:
+  std::map<std::string, std::shared_ptr<core::Connectable>> containers;
+
   std::map<std::string, std::shared_ptr<core::Connectable>> connectionMap;
   // Mutex for protection
   std::mutex mutex_;
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index 06cfcce..334d960 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -148,17 +148,6 @@
     logger_->log_debug("Enqueue flow file UUID %s to connection %s", flow->getUUIDStr(), name_);
   }
 
-  if (!flow->isStored()) {
-    // Save to the flowfile repo
-    FlowFileRecord event(flow_repository_, content_repo_, flow, this->uuidStr_);
-    if (event.Serialize()) {
-      flow->setStoredToRepository(true);
-    } else {
-      logger_->log_error("Failed to serialize FlowFileRecord to repo!");
-      throw Exception(PROCESS_SESSION_EXCEPTION, "Failed to put flowfile to repository");
-    }
-  }
-
   // Notify receiving processor that work may be available
   if (dest_connectable_) {
     logger_->log_debug("Notifying %s that %s was inserted", dest_connectable_->getName(), flow->getUUIDStr());
@@ -167,8 +156,6 @@
 }
 
 void Connection::multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows) {
-  std::vector<std::pair<std::string, std::unique_ptr<io::DataStream>>> flowData;
-
   {
     std::lock_guard<std::mutex> lock(mutex_);
 
@@ -182,35 +169,12 @@
       queued_data_size_ += ff->getSize();
 
       logger_->log_debug("Enqueue flow file UUID %s to connection %s", ff->getUUIDStr(), name_);
-
-      if (!ff->isStored()) {
-        // Save to the flowfile repo
-        FlowFileRecord event(flow_repository_, content_repo_, ff, this->uuidStr_);
-
-        std::unique_ptr<io::DataStream> stramptr(new io::DataStream());
-        event.Serialize(*stramptr.get());
-
-        flowData.emplace_back(event.getUUIDStr(), std::move(stramptr));
-      }
     }
   }
 
-  if (!flow_repository_->MultiPut(flowData)) {
-    logger_->log_error("Failed execute multiput on FF repo!");
-    throw Exception(PROCESS_SESSION_EXCEPTION, "Failed to put flowfiles to repository");
-  }
-
-  for (auto& ff : flows) {
-    if (drop_empty_ && ff->getSize() == 0) {
-      continue;
-    }
-
-    ff->setStoredToRepository(true);
-
-    if (dest_connectable_) {
-      logger_->log_debug("Notifying %s that flowfiles were inserted", dest_connectable_->getName());
-      dest_connectable_->notifyWork();
-    }
+  if (dest_connectable_) {
+    logger_->log_debug("Notifying %s that flowfiles were inserted", dest_connectable_->getName());
+    dest_connectable_->notifyWork();
   }
 }
 
@@ -228,9 +192,6 @@
         // Flow record expired
         expiredFlowRecords.insert(item);
         logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
-        if (flow_repository_->Delete(item->getUUIDStr())) {
-          item->setStoredToRepository(false);
-        }
       } else {
         // Flow record not expired
         if (item->isPenalized()) {
@@ -268,10 +229,12 @@
   while (!queue_.empty()) {
     std::shared_ptr<core::FlowFile> item = queue_.front();
     queue_.pop();
-    logger_->log_debug("Delete flow file UUID %s from connection %s", item->getUUIDStr(), name_);
+    logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
     if (delete_permanently) {
-      if (flow_repository_->Delete(item->getUUIDStr())) {
+      if (item->isStored() && flow_repository_->Delete(item->getUUIDStr())) {
         item->setStoredToRepository(false);
+        auto claim = item->getResourceClaim();
+        if (claim) claim->decreaseFlowFileRecordOwnedCount();
       }
     }
   }
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 80a0102..da1c676 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -405,10 +405,13 @@
   if (this->flow_file_repo_ != nullptr) {
     logger_->log_debug("Getting connection map");
     std::map<std::string, std::shared_ptr<core::Connectable>> connectionMap;
+    std::map<std::string, std::shared_ptr<core::Connectable>> containers;
     if (this->root_ != nullptr) {
       this->root_->getConnections(connectionMap);
+      this->root_->getFlowFileContainers(containers);
     }
     flow_file_repo_->setConnectionMap(connectionMap);
+    flow_file_repo_->setContainers(containers);
     flow_file_repo_->loadComponent(content_repo_);
   } else {
     logger_->log_debug("Flow file repository is not set");
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 6dbbd75..0d66412 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -17,7 +17,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "FlowFileRecord.h"
 #include <time.h>
 #include <cstdio>
 #include <vector>
@@ -27,6 +26,8 @@
 #include <string>
 #include <iostream>
 #include <fstream>
+#include <cinttypes>
+#include "FlowFileRecord.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "core/Relationship.h"
 #include "core/Repository.h"
@@ -45,7 +46,7 @@
       content_repo_(content_repo),
       flow_repository_(flow_repository) {
   id_ = local_flow_seq_number_.load();
-  claim_ = claim;
+  claim_.set(*this, claim);
   // Increase the local ID for the flow record
   ++local_flow_seq_number_;
   // Populate the default attributes
@@ -59,12 +60,6 @@
   }
 
   snapshot_ = false;
-
-  if (claim_ != nullptr) {
-    // Increase the flow file record owned count for the resource claim
-    claim_->increaseFlowFileRecordOwnedCount();
-    content_full_fath_ = claim->getContentFullPath();
-  }
 }
 
 FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::shared_ptr<core::FlowFile> &event,
@@ -82,10 +77,7 @@
   offset_ = event->getOffset();
   event->getUUID(uuid_);
   uuid_connection_ = uuidConnection;
-  if (event->getResourceClaim()) {
-    event->getResourceClaim()->increaseFlowFileRecordOwnedCount();
-    content_full_fath_ = event->getResourceClaim()->getContentFullPath();
-  }
+  claim_.set(*this, event->getResourceClaim());
   if (event->getFlowIdentifier()) {
     std::string attr;
     event->getAttribute(FlowAttributeKey(FlowAttribute::FLOW_ID), attr);
@@ -102,6 +94,7 @@
       snapshot_(""),
       content_repo_(content_repo),
       flow_repository_(flow_repository) {
+  claim_.set(*this, event->getResourceClaim());
   if (event->getFlowIdentifier()) {
     std::string attr;
     event->getAttribute(FlowAttributeKey(FlowAttribute::FLOW_ID), attr);
@@ -118,29 +111,26 @@
     logger_->log_debug("Delete FlowFile UUID %s", uuidStr_);
   else
     logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuidStr_);
-  if (claim_) {
-    releaseClaim(claim_);
-  } else {
+
+  if (!claim_) {
     logger_->log_debug("Claim is null ptr for %s", uuidStr_);
   }
 
+  claim_.set(*this, nullptr);
+
   // Disown stash claims
-  for (const auto &stashPair : stashedContent_) {
-    releaseClaim(stashPair.second);
+  for (auto &stashPair : stashedContent_) {
+    auto& stashClaim = stashPair.second;
+    stashClaim.set(*this, nullptr);
   }
 }
 
 void FlowFileRecord::releaseClaim(std::shared_ptr<ResourceClaim> claim) {
   // Decrease the flow file record owned count for the resource claim
-  claim_->decreaseFlowFileRecordOwnedCount();
-  std::string value;
-  logger_->log_debug("Delete Resource Claim %s, %s, attempt %llu", getUUIDStr(), claim_->getContentFullPath(), claim_->getFlowFileRecordOwnedCount());
-  if (claim_->getFlowFileRecordOwnedCount() <= 0) {
-    // we cannot rely on the stored variable here since we aren't guaranteed atomicity
-    if (flow_repository_ != nullptr && !flow_repository_->Get(uuidStr_, value)) {
-      logger_->log_debug("Delete Resource Claim %s", claim_->getContentFullPath());
-      content_repo_->remove(claim_);
-    }
+  claim->decreaseFlowFileRecordOwnedCount();
+  logger_->log_debug("Detaching Resource Claim %s, %s, attempt " "%" PRIu64, getUUIDStr(), claim->getContentFullPath(), claim->getFlowFileRecordOwnedCount());
+  if (content_repo_ && content_repo_->removeIfOrphaned(claim)) {
+    logger_->log_debug("Deleted Resource Claim %s", claim->getContentFullPath());
   }
 }
 
@@ -187,7 +177,6 @@
 FlowFileRecord &FlowFileRecord::operator=(const FlowFileRecord &other) {
   core::FlowFile::operator=(other);
   uuid_connection_ = other.uuid_connection_;
-  content_full_fath_ = other.content_full_fath_;
   snapshot_ = other.snapshot_;
   return *this;
 }
@@ -260,7 +249,7 @@
     }
   }
 
-  ret = writeUTF(this->content_full_fath_, &outStream);
+  ret = writeUTF(this->getContentFullPath(), &outStream);
   if (ret <= 0) {
     return false;
   }
@@ -291,6 +280,8 @@
 
   if (flow_repository_->Put(uuidStr_, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
     logger_->log_debug("NiFi FlowFile Store event %s size %llu success", uuidStr_, outStream.getSize());
+    // on behalf of the persisted record instance
+    if (claim_) claim_->increaseFlowFileRecordOwnedCount();
     return true;
   } else {
     logger_->log_error("NiFi FlowFile Store event %s size %llu fail", uuidStr_, outStream.getSize());
@@ -351,7 +342,8 @@
     this->attributes_[key] = value;
   }
 
-  ret = readUTF(this->content_full_fath_, &outStream);
+  std::string content_full_path;
+  ret = readUTF(content_full_path, &outStream);
   if (ret <= 0) {
     return false;
   }
@@ -366,9 +358,7 @@
     return false;
   }
 
-  if (nullptr == claim_) {
-    claim_ = std::make_shared<ResourceClaim>(content_full_fath_, content_repo_, true);
-  }
+  claim_.set(*this, std::make_shared<ResourceClaim>(content_full_path, content_repo_, true));
   return true;
 }
 
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index 6b7ee2f..f94f2c9 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -64,9 +64,7 @@
   size_ = other.size_;
   penaltyExpiration_ms_ = other.penaltyExpiration_ms_;
   attributes_ = other.attributes_;
-  claim_ = other.claim_;
-  if (claim_ != nullptr)
-    this->claim_->increaseFlowFileRecordOwnedCount();
+  claim_.set(*this, other.claim_);
   uuidStr_ = other.uuidStr_;
   connection_ = other.connection_;
   original_connection_ = other.original_connection_;
@@ -95,36 +93,39 @@
 }
 
 std::shared_ptr<ResourceClaim> FlowFile::getResourceClaim() {
-  return claim_;
+  return claim_.get();
 }
 
 void FlowFile::clearResourceClaim() {
-  claim_ = nullptr;
+  claim_.set(*this, nullptr);
 }
-void FlowFile::setResourceClaim(std::shared_ptr<ResourceClaim> &claim) {
-  claim_ = claim;
+void FlowFile::setResourceClaim(const std::shared_ptr<ResourceClaim>& claim) {
+  claim_.set(*this, claim);
 }
 
-std::shared_ptr<ResourceClaim> FlowFile::getStashClaim(const std::string &key) {
-  return stashedContent_[key];
+std::shared_ptr<ResourceClaim> FlowFile::getStashClaim(const std::string& key) {
+  return stashedContent_[key].get();
 }
 
-void FlowFile::setStashClaim(const std::string &key, const std::shared_ptr<ResourceClaim> &claim) {
+void FlowFile::setStashClaim(const std::string& key, const std::shared_ptr<ResourceClaim>& claim) {
   if (hasStashClaim(key)) {
     logger_->log_warn("Stashing content of record %s to existing key %s; "
                       "existing content will be overwritten",
                       getUUIDStr().c_str(), key.c_str());
-    releaseClaim(getStashClaim(key));
   }
 
-  stashedContent_[key] = claim;
+  stashedContent_[key].set(*this, claim);
 }
 
-void FlowFile::clearStashClaim(const std::string &key) {
-  stashedContent_.erase(key);
+void FlowFile::clearStashClaim(const std::string& key) {
+  auto claimIt = stashedContent_.find(key);
+  if (claimIt != stashedContent_.end()) {
+    claimIt->second.set(*this, nullptr);
+    stashedContent_.erase(claimIt);
+  }
 }
 
-bool FlowFile::hasStashClaim(const std::string &key) {
+bool FlowFile::hasStashClaim(const std::string& key) {
   return stashedContent_.find(key) != stashedContent_.end();
 }
 
@@ -140,11 +141,11 @@
   return lineage_start_date_;
 }
 
-std::set<std::string> &FlowFile::getlineageIdentifiers() {
+std::set<std::string>& FlowFile::getlineageIdentifiers() {
   return lineage_Identifiers_;
 }
 
-bool FlowFile::getAttribute(std::string key, std::string &value) const {
+bool FlowFile::getAttribute(std::string key, std::string& value) const {
   auto it = attributes_.find(key);
   if (it != attributes_.end()) {
     value = it->second;
@@ -183,7 +184,7 @@
   }
 }
 
-bool FlowFile::addAttribute(const std::string &key, const std::string &value) {
+bool FlowFile::addAttribute(const std::string& key, const std::string& value) {
   auto it = attributes_.find(key);
   if (it != attributes_.end()) {
     // attribute already there in the map
@@ -202,7 +203,7 @@
  * Sets the original connection with a shared pointer.
  * @param connection shared connection.
  */
-void FlowFile::setOriginalConnection(std::shared_ptr<core::Connectable> &connection) {
+void FlowFile::setOriginalConnection(std::shared_ptr<core::Connectable>& connection) {
   original_connection_ = connection;
 }
 
@@ -210,7 +211,7 @@
  * Sets the connection with a shared pointer.
  * @param connection shared connection.
  */
-void FlowFile::setConnection(std::shared_ptr<core::Connectable> &connection) {
+void FlowFile::setConnection(std::shared_ptr<core::Connectable>& connection) {
   connection_ = connection;
 }
 
@@ -218,7 +219,7 @@
  * Sets the connection with a shared pointer.
  * @param connection shared connection.
  */
-void FlowFile::setConnection(std::shared_ptr<core::Connectable> &&connection) {
+void FlowFile::setConnection(std::shared_ptr<core::Connectable>&& connection) {
   connection_ = connection;
 }
 
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index 01e403e..28e6c9a 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -360,6 +360,20 @@
   }
 }
 
+void ProcessGroup::getFlowFileContainers(std::map<std::string, std::shared_ptr<Connectable>> &containers) const {
+  for (auto connection : connections_) {
+    containers[connection->getUUIDStr()] = connection;
+    containers[connection->getName()] = connection;
+  }
+  for (auto processor : processors_) {
+    // processors can also own FlowFiles
+    containers[processor->getUUIDStr()] = processor;
+  }
+  for (auto processGroup : child_process_groups_) {
+    processGroup->getFlowFileContainers(containers);
+  }
+}
+
 void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) {
   std::lock_guard<std::recursive_mutex> lock(mutex_);
 
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 67ab5e1..e14d812 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -88,7 +88,11 @@
 }
 
 void ProcessSession::add(const std::shared_ptr<core::FlowFile> &record) {
+  if (_updatedFlowFiles.find(record->getUUIDStr()) != _updatedFlowFiles.end()) {
+    throw Exception(ExceptionType::PROCESSOR_EXCEPTION, "Mustn't add file that was provided by this session");
+  }
   _addedFlowFiles[record->getUUIDStr()] = record;
+  record->setDeleted(false);
 }
 
 std::shared_ptr<core::FlowFile> ProcessSession::create(const std::shared_ptr<core::FlowFile> &parent) {
@@ -130,10 +134,9 @@
     // Copy Resource Claim
     std::shared_ptr<ResourceClaim> parent_claim = parent->getResourceClaim();
     record->setResourceClaim(parent_claim);
-    if (parent_claim != nullptr) {
+    if (parent_claim) {
       record->setOffset(parent->getOffset());
       record->setSize(parent->getSize());
-      record->getResourceClaim()->increaseFlowFileRecordOwnedCount();
     }
     provenance_report_->clone(parent, record);
   }
@@ -170,10 +173,9 @@
     // Copy Resource Claim
     std::shared_ptr<ResourceClaim> parent_claim = parent->getResourceClaim();
     record->setResourceClaim(parent_claim);
-    if (parent_claim != nullptr) {
+    if (parent_claim) {
       record->setOffset(parent->getOffset());
       record->setSize(parent->getSize());
-      record->getResourceClaim()->increaseFlowFileRecordOwnedCount();
     }
     provenance_report_->clone(parent, record);
   }
@@ -198,11 +200,7 @@
       record->setOffset(parent->getOffset() + offset);
       record->setSize(size);
       // Copy Resource Claim
-      std::shared_ptr<ResourceClaim> parent_claim = parent->getResourceClaim();
-      record->setResourceClaim(parent_claim);
-      if (parent_claim != nullptr) {
-        record->getResourceClaim()->increaseFlowFileRecordOwnedCount();
-      }
+      record->setResourceClaim(parent->getResourceClaim());
     }
     provenance_report_->clone(parent, record);
   }
@@ -211,15 +209,6 @@
 
 void ProcessSession::remove(const std::shared_ptr<core::FlowFile> &flow) {
   flow->setDeleted(true);
-  if (flow->getResourceClaim() != nullptr) {
-    flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-    logger_->log_debug("Auto terminated %s %" PRIu64 " %s", flow->getResourceClaim()->getContentFullPath(), flow->getResourceClaim()->getFlowFileRecordOwnedCount(), flow->getUUIDStr());
-  } else {
-    logger_->log_debug("Flow does not contain content. no resource claim to decrement.");
-  }
-  if (_addedFlowFiles.find(flow->getUUIDStr()) == _addedFlowFiles.end()) {
-    process_context_->getFlowFileRepository()->Delete(flow->getUUIDStr());
-  }
   _deletedFlowFiles[flow->getUUIDStr()] = flow;
   std::string reason = process_context_->getProcessorNode()->getName() + " drop flow record " + flow->getUUIDStr();
   provenance_report_->drop(flow, reason);
@@ -248,6 +237,7 @@
 void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship) {
   logging::LOG_INFO(logger_) << "Transferring " << flow->getUUIDStr() << " from " << process_context_->getProcessorNode()->getName() << " to relationship " << relationship.getName();
   _transferRelationship[flow->getUUIDStr()] = relationship;
+  flow->setDeleted(false);
 }
 
 void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
@@ -255,28 +245,19 @@
 
   try {
     uint64_t startTime = getTimeMillis();
-    claim->increaseFlowFileRecordOwnedCount();
     std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
     // Call the callback to write the content
     if (nullptr == stream) {
-      claim->decreaseFlowFileRecordOwnedCount();
       rollback();
       return;
     }
     if (callback->process(stream) < 0) {
-      claim->decreaseFlowFileRecordOwnedCount();
       rollback();
       return;
     }
 
     flow->setSize(stream->getSize());
     flow->setOffset(0);
-    std::shared_ptr<ResourceClaim> flow_claim = flow->getResourceClaim();
-    if (flow_claim != nullptr) {
-      // Remove the old claim
-      flow_claim->decreaseFlowFileRecordOwnedCount();
-      flow->clearResourceClaim();
-    }
     flow->setResourceClaim(claim);
 
     stream->closeStream();
@@ -285,31 +266,21 @@
     uint64_t endTime = getTimeMillis();
     provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
   } catch (std::exception &exception) {
-    if (flow && flow->getResourceClaim() == claim) {
-      flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-      flow->clearResourceClaim();
-    }
     logger_->log_debug("Caught Exception %s", exception.what());
     throw;
   } catch (...) {
-    if (flow && flow->getResourceClaim() == claim) {
-      flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-      flow->clearResourceClaim();
-    }
     logger_->log_debug("Caught Exception during process session write");
     throw;
   }
 }
 
 void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
-  std::shared_ptr<ResourceClaim> claim = nullptr;
-  if (flow->getResourceClaim() == nullptr) {
+  std::shared_ptr<ResourceClaim> claim = flow->getResourceClaim();
+  if (!claim) {
     // No existed claim for append, we need to create new claim
     return write(flow, callback);
   }
 
-  claim = flow->getResourceClaim();
-
   try {
     uint64_t startTime = getTimeMillis();
     std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim, true);
@@ -392,12 +363,10 @@
 
   try {
     auto startTime = getTimeMillis();
-    claim->increaseFlowFileRecordOwnedCount();
     std::shared_ptr<io::BaseStream> content_stream = process_context_->getContentRepository()->write(claim);
 
     if (nullptr == content_stream) {
       logger_->log_debug("Could not obtain claim for %s", claim->getContentFullPath());
-      claim->decreaseFlowFileRecordOwnedCount();
       rollback();
       return;
     }
@@ -414,11 +383,6 @@
 
     flow->setSize(content_stream->getSize());
     flow->setOffset(0);
-    if (flow->getResourceClaim() != nullptr) {
-      // Remove the old claim
-      flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-      flow->clearResourceClaim();
-    }
     flow->setResourceClaim(claim);
 
     logger_->log_debug("Import offset %" PRIu64 " length %" PRIu64 " into content %s for FlowFile UUID %s",
@@ -430,17 +394,9 @@
     auto endTime = getTimeMillis();
     provenance_report_->modifyContent(flow, details.str(), endTime - startTime);
   } catch (std::exception &exception) {
-    if (flow && flow->getResourceClaim() == claim) {
-      flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-      flow->clearResourceClaim();
-    }
     logger_->log_debug("Caught Exception %s", exception.what());
     throw;
   } catch (...) {
-    if (flow && flow->getResourceClaim() == claim) {
-      flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-      flow->clearResourceClaim();
-    }
     logger_->log_debug("Caught Exception during process session write");
     throw;
   }
@@ -455,10 +411,8 @@
     auto startTime = getTimeMillis();
     std::ifstream input;
     input.open(source.c_str(), std::fstream::in | std::fstream::binary);
-    claim->increaseFlowFileRecordOwnedCount();
     std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
     if (nullptr == stream) {
-      claim->decreaseFlowFileRecordOwnedCount();
       rollback();
       return;
     }
@@ -490,11 +444,6 @@
       if (!invalidWrite) {
         flow->setSize(stream->getSize());
         flow->setOffset(0);
-        if (flow->getResourceClaim() != nullptr) {
-          // Remove the old claim
-          flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-          flow->clearResourceClaim();
-        }
         flow->setResourceClaim(claim);
 
         logger_->log_debug("Import offset %" PRIu64 " length %" PRIu64 " into content %s for FlowFile UUID %s", flow->getOffset(), flow->getSize(), flow->getResourceClaim()->getContentFullPath(),
@@ -517,17 +466,9 @@
       throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
     }
   } catch (std::exception &exception) {
-    if (flow && flow->getResourceClaim() == claim) {
-      flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-      flow->clearResourceClaim();
-    }
     logger_->log_debug("Caught Exception %s", exception.what());
     throw;
   } catch (...) {
-    if (flow && flow->getResourceClaim() == claim) {
-      flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-      flow->clearResourceClaim();
-    }
     logger_->log_debug("Caught Exception during process session write");
     throw;
   }
@@ -540,111 +481,97 @@
 
   std::vector<uint8_t> buffer(getpagesize());
   try {
-    try {
-      std::ifstream input{source, std::ios::in | std::ios::binary};
-      logger_->log_debug("Opening %s", source);
-      if (!input.is_open() || !input.good()) {
-        throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+    std::ifstream input{source, std::ios::in | std::ios::binary};
+    logger_->log_debug("Opening %s", source);
+    if (!input.is_open() || !input.good()) {
+      throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'"));
+    }
+    if (offset != 0U) {
+      input.seekg(offset, std::ifstream::beg);
+      if (!input.good()) {
+        logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
+        throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
       }
-      if (offset != 0U) {
-        input.seekg(offset, std::ifstream::beg);
-        if (!input.good()) {
-          logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source);
-          throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset)));
-        }
+    }
+    uint64_t startTime = 0U;
+    while (input.good()) {
+      input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
+      std::streamsize read = input.gcount();
+      if (read < 0) {
+        throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
       }
-      uint64_t startTime = 0U;
-      while (input.good()) {
-        input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
-        std::streamsize read = input.gcount();
-        if (read < 0) {
-          throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value");
-        }
-        if (read == 0) {
-          logger_->log_trace("Finished reading input %s", source);
+      if (read == 0) {
+        logger_->log_trace("Finished reading input %s", source);
+        break;
+      } else {
+        logging::LOG_TRACE(logger_) << "Read input of " << read;
+      }
+      uint8_t* begin = buffer.data();
+      uint8_t* end = begin + read;
+      while (true) {
+        startTime = getTimeMillis();
+        uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
+        const auto len = gsl::narrow<int>(delimiterPos - begin);
+
+        logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
+        /*
+         * We do not want to process the rest of the buffer after the last delimiter if
+         *  - we have reached EOF in the file (we would discard it anyway)
+         *  - there is nothing to process (the last character in the buffer is a delimiter)
+         */
+        if (delimiterPos == end && (input.eof() || len == 0)) {
           break;
-        } else {
-          logging::LOG_TRACE(logger_) << "Read input of " << read;
         }
-        uint8_t* begin = buffer.data();
-        uint8_t* end = begin + read;
-        while (true) {
+
+        /* Create claim and stream if needed and append data */
+        if (claim == nullptr) {
           startTime = getTimeMillis();
-          uint8_t* delimiterPos = std::find(begin, end, static_cast<uint8_t>(inputDelimiter));
-          const auto len = gsl::narrow<int>(delimiterPos - begin);
-
-          logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end);
-          /*
-           * We do not want to process the rest of the buffer after the last delimiter if
-           *  - we have reached EOF in the file (we would discard it anyway)
-           *  - there is nothing to process (the last character in the buffer is a delimiter)
-           */
-          if (delimiterPos == end && (input.eof() || len == 0)) {
-            break;
-          }
-
-          /* Create claim and stream if needed and append data */
-          if (claim == nullptr) {
-            startTime = getTimeMillis();
-            claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
-          }
-          if (stream == nullptr) {
-            stream = process_context_->getContentRepository()->write(claim);
-          }
-          if (stream == nullptr) {
-            logger_->log_error("Stream is null");
-            rollback();
-            return;
-          }
-          if (stream->write(begin, len) != len) {
-            logger_->log_error("Error while writing");
-            stream->closeStream();
-            throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
-          }
-
-          /* Create a FlowFile if we reached a delimiter */
-          if (delimiterPos == end) {
-            break;
-          }
-          flowFile = std::static_pointer_cast<FlowFileRecord>(create());
-          flowFile->setSize(stream->getSize());
-          flowFile->setOffset(0);
-          if (flowFile->getResourceClaim() != nullptr) {
-            /* Remove the old claim */
-            flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-            flowFile->clearResourceClaim();
-          }
-          flowFile->setResourceClaim(claim);
-          claim->increaseFlowFileRecordOwnedCount();
-          logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
-                                      << ", FlowFile UUID " << flowFile->getUUIDStr();
-          stream->closeStream();
-          std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
-          uint64_t endTime = getTimeMillis();
-          provenance_report_->modifyContent(flowFile, details, endTime - startTime);
-          flows.push_back(flowFile);
-
-          /* Reset these to start processing the next FlowFile with a clean slate */
-          flowFile.reset();
-          stream.reset();
-          claim.reset();
-
-          /* Skip delimiter */
-          begin = delimiterPos + 1;
+          claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
         }
+        if (stream == nullptr) {
+          stream = process_context_->getContentRepository()->write(claim);
+        }
+        if (stream == nullptr) {
+          logger_->log_error("Stream is null");
+          rollback();
+          return;
+        }
+        if (stream->write(begin, len) != len) {
+          logger_->log_error("Error while writing");
+          stream->closeStream();
+          throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error creating Flowfile");
+        }
+
+        /* Create a FlowFile if we reached a delimiter */
+        if (delimiterPos == end) {
+          break;
+        }
+        flowFile = std::static_pointer_cast<FlowFileRecord>(create());
+        flowFile->setSize(stream->getSize());
+        flowFile->setOffset(0);
+        flowFile->setResourceClaim(claim);
+        logging::LOG_DEBUG(logger_) << "Import offset " << flowFile->getOffset() << " length " << flowFile->getSize() << " content " << flowFile->getResourceClaim()->getContentFullPath()
+                                    << ", FlowFile UUID " << flowFile->getUUIDStr();
+        stream->closeStream();
+        std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
+        uint64_t endTime = getTimeMillis();
+        provenance_report_->modifyContent(flowFile, details, endTime - startTime);
+        flows.push_back(flowFile);
+
+        /* Reset these to start processing the next FlowFile with a clean slate */
+        flowFile.reset();
+        stream.reset();
+        claim.reset();
+
+        /* Skip delimiter */
+        begin = delimiterPos + 1;
       }
-    } catch (std::exception &exception) {
-      logger_->log_debug("Caught Exception %s", exception.what());
-      throw;
-    } catch (...) {
-      logger_->log_debug("Caught Exception during process session write");
-      throw;
     }
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
   } catch (...) {
-    if (flowFile != nullptr && claim != nullptr && flowFile->getResourceClaim() == claim) {
-      flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-      flowFile->clearResourceClaim();
-    }
+    logger_->log_debug("Caught Exception during process session write");
     throw;
   }
 }
@@ -700,39 +627,38 @@
 void ProcessSession::stash(const std::string &key, const std::shared_ptr<core::FlowFile> &flow) {
   logger_->log_debug("Stashing content from %s to key %s", flow->getUUIDStr(), key);
 
-  if (!flow->getResourceClaim()) {
+  auto claim = flow->getResourceClaim();
+  if (!claim) {
     logger_->log_warn("Attempted to stash content of record %s when "
                       "there is no resource claim",
                       flow->getUUIDStr());
     return;
   }
 
-// Stash the claim
-  auto claim = flow->getResourceClaim();
+  // Stash the claim
   flow->setStashClaim(key, claim);
 
-// Clear current claim
+  // Clear current claim
   flow->clearResourceClaim();
 }
 
 void ProcessSession::restore(const std::string &key, const std::shared_ptr<core::FlowFile> &flow) {
   logger_->log_info("Restoring content to %s from key %s", flow->getUUIDStr(), key);
 
-// Restore the claim
+  // Restore the claim
   if (!flow->hasStashClaim(key)) {
     logger_->log_warn("Requested restore to record %s from unknown key %s", flow->getUUIDStr(), key);
     return;
   }
 
-// Disown current claim if existing
+  // Disown current claim if existing
   if (flow->getResourceClaim()) {
     logger_->log_warn("Restoring stashed content of record %s from key %s when there is "
                       "existing content; existing content will be overwritten",
                       flow->getUUIDStr(), key);
-    flow->releaseClaim(flow->getResourceClaim());
   }
 
-// Restore the claim
+  // Restore the claim
   auto stashClaim = flow->getStashClaim(key);
   flow->setResourceClaim(stashClaim);
   flow->clearStashClaim(key);
@@ -829,9 +755,9 @@
       }
     }
 
-    std::map<std::shared_ptr<Connection>, std::vector<std::shared_ptr<FlowFile>>> connectionQueues;
+    std::map<std::shared_ptr<Connectable>, std::vector<std::shared_ptr<FlowFile>>> connectionQueues;
 
-    std::shared_ptr<Connection> connection = nullptr;
+    std::shared_ptr<Connectable> connection = nullptr;
     // Complete process the added and update flow files for the session, send the flow file to its queue
     for (const auto &it : _updatedFlowFiles) {
       std::shared_ptr<core::FlowFile> record = it.second;
@@ -840,7 +766,7 @@
         continue;
       }
 
-      connection = std::static_pointer_cast<Connection>(record->getConnection());
+      connection = record->getConnection();
       if ((connection) != nullptr) {
         connectionQueues[connection].push_back(record);
       }
@@ -851,7 +777,7 @@
       if (record->isDeleted()) {
         continue;
       }
-      connection = std::static_pointer_cast<Connection>(record->getConnection());
+      connection = record->getConnection();
       if ((connection) != nullptr) {
         connectionQueues[connection].push_back(record);
       }
@@ -863,14 +789,41 @@
       if (record->isDeleted()) {
         continue;
       }
-      connection = std::static_pointer_cast<Connection>(record->getConnection());
+      connection = record->getConnection();
       if ((connection) != nullptr) {
         connectionQueues[connection].push_back(record);
       }
     }
 
+    for (const auto& it : _deletedFlowFiles) {
+        auto record = it.second;
+        if (!record->isDeleted()) {
+          continue;
+        }
+        if (record->isStored() && process_context_->getFlowFileRepository()->Delete(record->getUUIDStr())) {
+          record->setStoredToRepository(false);
+          auto claim = record->getResourceClaim();
+          if (claim) {
+            claim->decreaseFlowFileRecordOwnedCount();
+            logger_->log_debug("Decrementing resource claim on behalf of the persisted instance %s %" PRIu64 " %s",
+                claim->getContentFullPath(), claim->getFlowFileRecordOwnedCount(), record->getUUIDStr());
+          } else {
+            logger_->log_debug("Flow does not contain content. no resource claim to decrement.");
+          }
+        }
+    }
+
+    persistFlowFilesBeforeTransfer(connectionQueues, _flowFileSnapShots);
+
     for (auto& cq : connectionQueues) {
-      cq.first->multiPut(cq.second);
+      auto connection = std::dynamic_pointer_cast<Connection>(cq.first);
+      if (connection) {
+        connection->multiPut(cq.second);
+      } else {
+        for (auto& file : cq.second) {
+          cq.first->put(file);
+        }
+      }
     }
 
     // All done
@@ -878,7 +831,7 @@
     _addedFlowFiles.clear();
     _clonedFlowFiles.clear();
     _deletedFlowFiles.clear();
-    _originalFlowFiles.clear();
+    _flowFileSnapShots.clear();
 
     _transferRelationship.clear();
     // persistent the provenance report
@@ -894,14 +847,18 @@
 }
 
 void ProcessSession::rollback() {
-  std::map<std::shared_ptr<Connection>, std::vector<std::shared_ptr<FlowFile>>> connectionQueues;
+  // new FlowFiles are only persisted during commit
+  // no need to delete them here
+  std::map<std::shared_ptr<Connectable>, std::vector<std::shared_ptr<FlowFile>>> connectionQueues;
 
   try {
-    std::shared_ptr<Connection> connection = nullptr;
-    // Requeue the snapshot of the flowfile back
-    for (const auto &it : _originalFlowFiles) {
+    std::shared_ptr<Connectable> connection = nullptr;
+    // Restore the flowFiles from the snapshot
+    for (const auto &it : _updatedFlowFiles) {
       std::shared_ptr<core::FlowFile> record = it.second;
-      connection = std::static_pointer_cast<Connection>(record->getOriginalConnection());
+      auto snaphost = _flowFileSnapShots[record->getUUIDStr()];
+      *record = *snaphost;
+      connection = record->getOriginalConnection();
       if ((connection) != nullptr) {
         std::shared_ptr<FlowFileRecord> flowf = std::static_pointer_cast<FlowFileRecord>(record);
         flowf->setSnapShot(false);
@@ -910,11 +867,23 @@
       }
     }
 
-    for (auto& cq : connectionQueues) {
-      cq.first->multiPut(cq.second);
+    for (const auto& it : _deletedFlowFiles) {
+      it.second->setDeleted(false);
     }
 
-    _originalFlowFiles.clear();
+    // put everything back where it came from
+    for (auto& cq : connectionQueues) {
+      auto connection = std::dynamic_pointer_cast<Connection>(cq.first);
+      if (connection) {
+        connection->multiPut(cq.second);
+      } else {
+        for (auto& flow : cq.second) {
+          cq.first->put(flow);
+        }
+      }
+    }
+
+    _flowFileSnapShots.clear();
 
     _clonedFlowFiles.clear();
     _addedFlowFiles.clear();
@@ -930,6 +899,71 @@
   }
 }
 
+void ProcessSession::persistFlowFilesBeforeTransfer(
+    std::map<std::shared_ptr<Connectable>, std::vector<std::shared_ptr<core::FlowFile> > >& transactionMap,
+    const std::map<std::string, std::shared_ptr<FlowFile>>& originalFlowFileSnapShots) {
+
+  std::vector<std::pair<std::string, std::unique_ptr<io::DataStream>>> flowData;
+
+  auto flowFileRepo = process_context_->getFlowFileRepository();
+  auto contentRepo = process_context_->getContentRepository();
+
+  for (auto& transaction : transactionMap) {
+    const std::shared_ptr<Connectable>& target = transaction.first;
+    std::shared_ptr<Connection> connection = std::dynamic_pointer_cast<Connection>(target);
+    const bool shouldDropEmptyFiles = connection ? connection->getDropEmptyFlowFiles() : false;
+    auto& flows = transaction.second;
+    for (auto &ff : flows) {
+      if (shouldDropEmptyFiles && ff->getSize() == 0) {
+        // the receiver will drop this FF
+        continue;
+      }
+      FlowFileRecord event(flowFileRepo, contentRepo, ff, target->getUUIDStr());
+
+      std::unique_ptr<io::DataStream> stream(new io::DataStream());
+      event.Serialize(*stream);
+
+      flowData.emplace_back(event.getUUIDStr(), std::move(stream));
+    }
+  }
+
+  if (!flowFileRepo->MultiPut(flowData)) {
+    logger_->log_error("Failed execute multiput on FF repo!");
+    throw Exception(PROCESS_SESSION_EXCEPTION, "Failed to put flowfiles to repository");
+  }
+
+  for (auto& transaction : transactionMap) {
+    const std::shared_ptr<Connectable>& target = transaction.first;
+    std::shared_ptr<Connection> connection = std::dynamic_pointer_cast<Connection>(target);
+    const bool shouldDropEmptyFiles = connection ? connection->getDropEmptyFlowFiles() : false;
+    auto& flows = transaction.second;
+    for (auto &ff : flows) {
+      auto snapshotIt = originalFlowFileSnapShots.find(ff->getUUIDStr());
+      auto original = snapshotIt != originalFlowFileSnapShots.end() ? snapshotIt->second : nullptr;
+      if (shouldDropEmptyFiles && ff->getSize() == 0) {
+        // the receiver promised to drop this FF, no need for it anymore
+        if (ff->isStored() && flowFileRepo->Delete(ff->getUUIDStr())) {
+          // original must be non-null since this flowFile is already stored in the repos ->
+          // must have come from a session->get()
+          auto claim = original->getResourceClaim();
+          // decrement on behalf of the persisted-instance-to-be-deleted
+          if (claim) claim->decreaseFlowFileRecordOwnedCount();
+          ff->setStoredToRepository(false);
+        }
+        continue;
+      }
+      auto claim = ff->getResourceClaim();
+      // increment on behalf of the persisted instance
+      if (claim) claim->increaseFlowFileRecordOwnedCount();
+      auto originalClaim = original ? original->getResourceClaim() : nullptr;
+      // decrement on behalf of the overridden instance if any
+      if (ff->isStored() && originalClaim) originalClaim->decreaseFlowFileRecordOwnedCount();
+
+      ff->setStoredToRepository(true);
+    }
+  }
+}
+
 std::shared_ptr<core::FlowFile> ProcessSession::get() {
   std::shared_ptr<Connectable> first = process_context_->getProcessorNode()->pickIncomingConnection();
 
@@ -949,6 +983,12 @@
         std::stringstream details;
         details << process_context_->getProcessorNode()->getName() << " expire flow record " << record->getUUIDStr();
         provenance_report_->expire(record, details.str());
+        // there is no rolling back expired FlowFiles
+        if (record->isStored() && process_context_->getFlowFileRepository()->Delete(record->getUUIDStr())) {
+          record->setStoredToRepository(false);
+          auto claim = record->getResourceClaim();
+          if (claim) claim->decreaseFlowFileRecordOwnedCount();
+        }
       }
     }
     if (ret) {
@@ -964,9 +1004,9 @@
         snapshot->setAttribute(attr, flow_version->getFlowId());
       }
       logger_->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr());
-      snapshot = ret;
+      *snapshot = *ret;
       // save a snapshot
-      _originalFlowFiles[snapshot->getUUIDStr()] = snapshot;
+      _flowFileSnapShots[snapshot->getUUIDStr()] = snapshot;
       return ret;
     }
     current = std::static_pointer_cast<Connection>(process_context_->getProcessorNode()->pickIncomingConnection());
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index 6ce3123..9556000 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -238,7 +238,8 @@
     // We already has connection for this relationship
     std::set<std::shared_ptr<Connectable>> existedConnection = connection_pair.second;
     const bool has_full_connection = std::any_of(begin(existedConnection), end(existedConnection), [](const std::shared_ptr<Connectable>& conn) {
-      return std::static_pointer_cast<Connection>(conn)->isFull();
+      auto connection = std::dynamic_pointer_cast<Connection>(conn);
+      return connection && connection->isFull();
     });
     if (has_full_connection) { return true; }
   }
diff --git a/libminifi/test/BufferReader.h b/libminifi/test/BufferReader.h
new file mode 100644
index 0000000..f40a180
--- /dev/null
+++ b/libminifi/test/BufferReader.h
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_BUFFERREADER_H
+#define NIFI_MINIFI_CPP_BUFFERREADER_H
+
+#include "FlowFileRecord.h"
+
+class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
+ public:
+  explicit BufferReader(std::vector<uint8_t>& buffer) : buffer_(buffer){}
+
+  int write(org::apache::nifi::minifi::io::BaseStream& input, std::size_t len) {
+    uint8_t tmpBuffer[4096]{};
+    std::size_t remaining_len = len;
+    int total_read = 0;
+    while (remaining_len > 0) {
+      auto ret = input.read(tmpBuffer, std::min(remaining_len, sizeof(tmpBuffer)));
+      if (ret == 0) break;
+      if (ret < 0) return ret;
+      remaining_len -= ret;
+      total_read += ret;
+      auto prevSize = buffer_.size();
+      buffer_.resize(prevSize + ret);
+      std::move(tmpBuffer, tmpBuffer + ret, buffer_.data() + prevSize);
+    }
+    return total_read;
+  }
+
+  int64_t process(std::shared_ptr<org::apache::nifi::minifi::io::BaseStream> stream) {
+    return write(*stream.get(), stream->getSize());
+  }
+
+ private:
+  std::vector<uint8_t>& buffer_;
+};
+
+#endif //NIFI_MINIFI_CPP_BUFFERREADER_H
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index e6920fb..0173ba2 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -464,6 +464,7 @@
   {
     auto session = std::make_shared<core::ProcessSession>(context);
     processor->onTrigger(context, session);
+    session->commit();
   }
   // validate the merge content
   std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
@@ -751,7 +752,3 @@
   }
   LogTestController::getInstance().reset();
 }
-
-
-
-
diff --git a/libminifi/test/flow-tests/CMakeLists.txt b/libminifi/test/flow-tests/CMakeLists.txt
index 905b38b..e377aff 100644
--- a/libminifi/test/flow-tests/CMakeLists.txt
+++ b/libminifi/test/flow-tests/CMakeLists.txt
@@ -34,7 +34,7 @@
 
     target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
     MATH(EXPR FLOW_TEST_COUNT "${FLOW_TEST_COUNT}+1")
-    add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
+    add_test(NAME "${testfilename}" COMMAND "${testfilename}")
 ENDFOREACH()
 message("-- Finished building ${FLOW_TEST_COUNT} flow test file(s)...")
 
diff --git a/libminifi/test/flow-tests/FlowControllerTests.cpp b/libminifi/test/flow-tests/FlowControllerTests.cpp
index 6bd6699..903a384 100644
--- a/libminifi/test/flow-tests/FlowControllerTests.cpp
+++ b/libminifi/test/flow-tests/FlowControllerTests.cpp
@@ -124,11 +124,11 @@
   auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
   auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
 
-  // prevent the initial trigger
-  // in case the source got triggered
-  // and the scheduler triggers the sink
-  // before we could initiate the shutdown
-  sinkProc->yield(100);
+  std::promise<void> execSinkPromise;
+  std::future<void> execSinkFuture = execSinkPromise.get_future();
+  sinkProc->onTriggerCb_ = [&] {
+    execSinkFuture.wait();
+  };
 
   testController.startFlow();
 
@@ -140,8 +140,8 @@
 
   REQUIRE(root->getTotalFlowFileCount() == 3);
   REQUIRE(sourceProc->trigger_count.load() == 1);
-  REQUIRE(sinkProc->trigger_count.load() == 0);
 
+  execSinkPromise.set_value();
   controller->stop(true);
 
   REQUIRE(sourceProc->trigger_count.load() == 1);
@@ -158,12 +158,10 @@
   auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
   auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
 
-  // prevent the initial trigger
-  // in case the source got triggered
-  // and the scheduler triggers the sink
-  sinkProc->yield(100);
-
+  std::promise<void> execSinkPromise;
+  std::future<void> execSinkFuture = execSinkPromise.get_future();
   sinkProc->onTriggerCb_ = [&]{
+    execSinkFuture.wait();
     static std::atomic<bool> first_onTrigger{true};
     bool isFirst = true;
     // sleep only on the first trigger
@@ -182,8 +180,8 @@
 
   REQUIRE(root->getTotalFlowFileCount() == 3);
   REQUIRE(sourceProc->trigger_count.load() == 1);
-  REQUIRE(sinkProc->trigger_count.load() == 0);
 
+  execSinkPromise.set_value();
   controller->stop(true);
 
   REQUIRE(sourceProc->trigger_count.load() == 1);
@@ -202,12 +200,10 @@
   auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessor("Generator"));
   auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessor("TestProcessor"));
 
-  // prevent the initial trigger
-  // in case the source got triggered
-  // and the scheduler triggers the sink
-  sinkProc->yield(100);
-
+  std::promise<void> execSinkPromise;
+  std::future<void> execSinkFuture = execSinkPromise.get_future();
   sinkProc->onTriggerCb_ = [&]{
+    execSinkFuture.wait();
     static std::atomic<bool> first_onTrigger{true};
     bool isFirst = true;
     // sleep only on the first trigger
@@ -226,9 +222,9 @@
 
   REQUIRE(root->getTotalFlowFileCount() == 3);
   REQUIRE(sourceProc->trigger_count.load() == 1);
-  REQUIRE(sinkProc->trigger_count.load() == 0);
 
   std::thread shutdownThread([&]{
+    execSinkPromise.set_value();
     controller->stop(true);
   });
 
diff --git a/libminifi/test/flow-tests/TestControllerWithFlow.h b/libminifi/test/flow-tests/TestControllerWithFlow.h
index aee610d..b91c849 100644
--- a/libminifi/test/flow-tests/TestControllerWithFlow.h
+++ b/libminifi/test/flow-tests/TestControllerWithFlow.h
@@ -56,6 +56,8 @@
   }
 
   ~TestControllerWithFlow() {
+    controller_->stop(true);
+    controller_->unload();
     LogTestController::getInstance().reset();
   }
 
diff --git a/libminifi/test/persistence-tests/CMakeLists.txt b/libminifi/test/persistence-tests/CMakeLists.txt
new file mode 100644
index 0000000..a2ae85c
--- /dev/null
+++ b/libminifi/test/persistence-tests/CMakeLists.txt
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+
+file(GLOB PERSISTENCE_TESTS  "*.cpp")
+SET(PERSISTENCE_TEST_COUNT 0)
+FOREACH(testfile ${PERSISTENCE_TESTS})
+	get_filename_component(testfilename "${testfile}" NAME_WE)
+	add_executable("${testfilename}" "${testfile}")
+	target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/libarchive")
+	target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors")
+	target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/rocksdb-repos/")
+	target_include_directories(${testfilename} BEFORE PRIVATE "${ROCKSDB_THIRDPARTY_ROOT}/include")
+	target_wholearchive_library(${testfilename} minifi-archive-extensions)
+	target_wholearchive_library(${testfilename} minifi-standard-processors)
+	target_wholearchive_library(${testfilename} minifi-rocksdb-repos)
+	createTests("${testfilename}")
+	target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
+	MATH(EXPR PERSISTENCE_TEST_COUNT "${PERSISTENCE_TEST_COUNT}+1")
+	add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
+ENDFOREACH()
+message("-- Finished building ${ARCHIVE-PERSISTENCE_TEST_COUNT} Persistence related test file(s)...")
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp
new file mode 100644
index 0000000..b032410
--- /dev/null
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -0,0 +1,319 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <chrono>
+#include <map>
+#include <memory>
+#include <string>
+#include <thread>
+
+#include "core/Core.h"
+#include "core/repository/AtomicRepoEntries.h"
+#include "core/RepositoryFactory.h"
+#include "FlowFileRecord.h"
+#include "FlowFileRepository.h"
+#include "properties/Configure.h"
+#include "../unit/ProvenanceTestHelper.h"
+#include "../TestBase.h"
+#include "../../extensions/libarchive/MergeContent.h"
+#include "../test/BufferReader.h"
+
+using Connection = minifi::Connection;
+using MergeContent = minifi::processors::MergeContent;
+
+struct TestFlow{
+  TestFlow(const std::shared_ptr<core::repository::FlowFileRepository>& ff_repository, const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<core::Repository>& prov_repo,
+        const std::function<std::shared_ptr<core::Processor>(utils::Identifier&)>& processorGenerator, const core::Relationship& relationshipToOutput)
+      : ff_repository(ff_repository), content_repo(content_repo), prov_repo(prov_repo) {
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+
+    // setup processor
+    {
+      processor = processorGenerator(mainProcUUID());
+      std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+      processorContext = std::make_shared<core::ProcessContext>(node, controller_services_provider, prov_repo, ff_repository, content_repo);
+    }
+
+    // setup INPUT processor
+    {
+      inputProcessor = std::make_shared<core::Processor>("source", inputProcUUID());
+      std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(inputProcessor);
+      inputContext = std::make_shared<core::ProcessContext>(node, controller_services_provider, prov_repo,
+                                                            ff_repository, content_repo);
+    }
+
+    // setup Input Connection
+    {
+      input = std::make_shared<Connection>(ff_repository, content_repo, "Input", inputConnUUID());
+      input->setRelationship({"input", "d"});
+      input->setDestinationUUID(mainProcUUID());
+      input->setSourceUUID(inputProcUUID());
+      inputProcessor->addConnection(input);
+    }
+
+    // setup Output Connection
+    {
+      output = std::make_shared<Connection>(ff_repository, content_repo, "Output", outputConnUUID());
+      output->setRelationship(relationshipToOutput);
+      output->setSourceUUID(mainProcUUID());
+    }
+
+    // setup ProcessGroup
+    {
+      root = std::make_shared<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP, "root");
+      root->addProcessor(processor);
+      root->addConnection(input);
+      root->addConnection(output);
+    }
+
+    // prepare Merge Processor for execution
+    processor->setScheduledState(core::ScheduledState::RUNNING);
+    processor->onSchedule(processorContext.get(), new core::ProcessSessionFactory(processorContext));
+  }
+  std::shared_ptr<core::FlowFile> write(const std::string& data) {
+    minifi::io::DataStream stream(reinterpret_cast<const uint8_t*>(data.c_str()), data.length());
+    core::ProcessSession sessionGenFlowFile(inputContext);
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
+    sessionGenFlowFile.importFrom(stream, flow);
+    assert(flow->getResourceClaim()->getFlowFileRecordOwnedCount() == 1);
+    sessionGenFlowFile.transfer(flow, {"input", "d"});
+    sessionGenFlowFile.commit();
+    return flow;
+  }
+  std::string read(const std::shared_ptr<core::FlowFile>& file) {
+    core::ProcessSession session(processorContext);
+    std::vector<uint8_t> buffer;
+    BufferReader reader(buffer);
+    session.read(file, &reader);
+    return {buffer.data(), buffer.data() + buffer.size()};
+  }
+  void trigger() {
+    auto session = std::make_shared<core::ProcessSession>(processorContext);
+    processor->onTrigger(processorContext, session);
+    session->commit();
+  }
+
+  std::shared_ptr<Connection> input;
+  std::shared_ptr<Connection> output;
+  std::shared_ptr<core::ProcessGroup> root;
+
+ private:
+  static utils::Identifier& mainProcUUID() {static auto id = utils::IdGenerator::getIdGenerator()->generate(); return id;}
+  static utils::Identifier& inputProcUUID() {static auto id = utils::IdGenerator::getIdGenerator()->generate(); return id;}
+  static utils::Identifier& inputConnUUID() {static auto id = utils::IdGenerator::getIdGenerator()->generate(); return id;}
+  static utils::Identifier& outputConnUUID() {static auto id = utils::IdGenerator::getIdGenerator()->generate(); return id;}
+
+  std::shared_ptr<core::Processor> inputProcessor;
+  std::shared_ptr<core::Processor> processor;
+  std::shared_ptr<core::repository::FlowFileRepository> ff_repository;
+  std::shared_ptr<core::ContentRepository> content_repo;
+  std::shared_ptr<core::Repository> prov_repo;
+  std::shared_ptr<core::ProcessContext> inputContext;
+  std::shared_ptr<core::ProcessContext> processorContext;
+};
+
+std::shared_ptr<MergeContent> setupMergeProcessor(const utils::Identifier& id) {
+  auto processor = std::make_shared<MergeContent>("MergeContent", id);
+  processor->initialize();
+  processor->setAutoTerminatedRelationships({{"original", "d"}});
+
+  processor->setProperty(MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+  processor->setProperty(MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
+  processor->setProperty(MergeContent::DelimiterStrategy, DELIMITER_STRATEGY_TEXT);
+  processor->setProperty(MergeContent::MinEntries, "3");
+  processor->setProperty(MergeContent::Header, "_Header_");
+  processor->setProperty(MergeContent::Footer, "_Footer_");
+  processor->setProperty(MergeContent::Demarcator, "_Demarcator_");
+  processor->setProperty(MergeContent::MaxBinAge, "1 h");
+
+  return processor;
+}
+
+TEST_CASE("Processors Can Store FlowFiles", "[TestP1]") {
+  TestController testController;
+  LogTestController::getInstance().setDebug<core::ContentRepository>();
+  LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+  LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
+  LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
+
+  char format[] = "/tmp/test.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, utils::file::FileUtils::concat_path(dir, "content_repository"));
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestRepository>();
+  std::shared_ptr<core::repository::FlowFileRepository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
+  ff_repository->initialize(config);
+  content_repo->initialize(config);
+
+  auto flowConfig = std::unique_ptr<core::FlowConfiguration>{new core::FlowConfiguration(prov_repo, ff_repository, content_repo, nullptr, config, "")};
+  auto flowController = std::make_shared<minifi::FlowController>(prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "", true);
+
+  {
+    TestFlow flow(ff_repository, content_repo, prov_repo, setupMergeProcessor, MergeContent::Merge);
+
+    flowController->load(flow.root);
+    ff_repository->start();
+
+    // write two files into the input
+    flow.write("one");
+    flow.write("two");
+    // capture them with the Merge Processor
+    flow.trigger();
+    flow.trigger();
+
+    ff_repository->stop();
+    flowController->unload();
+
+    // check if the processor has taken ownership
+    std::set<std::shared_ptr<core::FlowFile>> expired;
+    auto file = flow.input->poll(expired);
+    REQUIRE(!file);
+    REQUIRE(expired.empty());
+
+    file = flow.output->poll(expired);
+    REQUIRE(!file);
+    REQUIRE(expired.empty());
+  }
+
+  // swap the ProcessGroup and restart the FlowController
+  {
+    TestFlow flow(ff_repository, content_repo, prov_repo, setupMergeProcessor, MergeContent::Merge);
+
+    flowController->load(flow.root);
+    ff_repository->start();
+    // wait for FlowFileRepository to start and notify the owners of
+    // the resurrected FlowFiles
+    std::this_thread::sleep_for(std::chrono::milliseconds{100});
+
+    // write the third file into the input
+    flow.write("three");
+
+    flow.trigger();
+    ff_repository->stop();
+    flowController->unload();
+
+    std::set<std::shared_ptr<core::FlowFile>> expired;
+    auto file = flow.output->poll(expired);
+    REQUIRE(file);
+    REQUIRE(expired.empty());
+
+    auto content = flow.read(file);
+    auto isOneOfPossibleResults =
+        Catch::Equals("_Header_one_Demarcator_two_Demarcator_three_Footer_")
+        || Catch::Equals("_Header_two_Demarcator_one_Demarcator_three_Footer_");
+
+    REQUIRE_THAT(content, isOneOfPossibleResults);
+  }
+}
+
+class ContentUpdaterProcessor : public core::Processor{
+ public:
+  ContentUpdaterProcessor(const std::string& name, utils::Identifier& id) : Processor(name, id) {}
+  void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override {
+    auto ff = session->get();
+    std::string data = "<override>";
+    minifi::io::DataStream stream(reinterpret_cast<const uint8_t*>(data.c_str()), data.length());
+    session->importFrom(stream, ff);
+    session->transfer(ff, {"success", "d"});
+  }
+};
+
+std::shared_ptr<core::Processor> setupContentUpdaterProcessor(utils::Identifier& id) {
+  return std::make_shared<ContentUpdaterProcessor>("Updater", id);
+}
+
+TEST_CASE("Persisted flowFiles are updated on modification", "[TestP1]") {
+  TestController testController;
+  LogTestController::getInstance().setDebug<core::ContentRepository>();
+  LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+  LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
+  LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
+
+  char format[] = "/tmp/test.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, utils::file::FileUtils::concat_path(dir, "content_repository"));
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestRepository>();
+  std::shared_ptr<core::repository::FlowFileRepository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
+  ff_repository->initialize(config);
+  content_repo->initialize(config);
+
+  auto flowConfig = std::unique_ptr<core::FlowConfiguration>{new core::FlowConfiguration(prov_repo, ff_repository, content_repo, nullptr, config, "")};
+  auto flowController = std::make_shared<minifi::FlowController>(prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "", true);
+
+  {
+    TestFlow flow(ff_repository, content_repo, prov_repo, setupContentUpdaterProcessor, {"success", "d"});
+
+    flowController->load(flow.root);
+    ff_repository->start();
+
+    // write two files into the input
+    auto flowFile = flow.write("data");
+    auto claim = flowFile->getResourceClaim();
+    // one from the FlowFile and one from the persisted instance
+    REQUIRE(claim->getFlowFileRecordOwnedCount() == 2);
+    // update them with the Merge Processor
+    flow.trigger();
+
+    auto content = flow.read(flowFile);
+    REQUIRE(content == "<override>");
+    auto newClaim = flowFile->getResourceClaim();
+    // the processor added new content to the flowFile
+    REQUIRE(claim != newClaim);
+    // nobody holds an owning reference to the previous claim
+    REQUIRE(claim->getFlowFileRecordOwnedCount() == 0);
+    // one from the FlowFile and one from the persisted instance
+    REQUIRE(newClaim->getFlowFileRecordOwnedCount() == 2);
+
+    ff_repository->stop();
+    flowController->unload();
+  }
+
+  // swap the ProcessGroup and restart the FlowController
+  {
+    TestFlow flow(ff_repository, content_repo, prov_repo, setupContentUpdaterProcessor, {"success", "d"});
+
+    flowController->load(flow.root);
+    ff_repository->start();
+    // wait for FlowFileRepository to start and notify the owners of
+    // the resurrected FlowFiles
+    std::this_thread::sleep_for(std::chrono::milliseconds{100});
+
+    std::set<std::shared_ptr<core::FlowFile>> expired;
+    auto file = flow.output->poll(expired);
+    REQUIRE(file);
+    REQUIRE(expired.empty());
+
+    auto content = flow.read(file);
+    REQUIRE(content == "<override>");
+    // the still persisted instance and this FlowFile
+    REQUIRE(file->getResourceClaim()->getFlowFileRecordOwnedCount() == 2);
+
+    ff_repository->stop();
+    flowController->unload();
+  }
+}
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp
index 7521d6f..b48ebdd 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -164,23 +164,22 @@
 
   std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ss.str(), content_repo);
 
-  minifi::FlowFileRecord record(repository, content_repo, attributes, claim);
+  {
+    minifi::FlowFileRecord record(repository, content_repo, attributes, claim);
 
-  record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd");
+    record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd");
 
-  record.addAttribute("", "hasdgasdgjsdgasgdsgsadaskgasd");
+    record.addAttribute("", "hasdgasdgjsdgasgdsgsadaskgasd");
 
-  REQUIRE(record.Serialize());
+    REQUIRE(record.Serialize());
 
-  claim->decreaseFlowFileRecordOwnedCount();
+    REQUIRE(repository->Delete(record.getUUIDStr()));
+    claim->decreaseFlowFileRecordOwnedCount();
 
-  claim->decreaseFlowFileRecordOwnedCount();
+    repository->flush();
 
-  repository->Delete(record.getUUIDStr());
-
-  repository->flush();
-
-  repository->stop();
+    repository->stop();
+  }
 
   std::ifstream fileopen(ss.str(), std::ios::in);
   REQUIRE(!fileopen.good());
@@ -272,7 +271,9 @@
   ff_repository->initialize(config);
   content_repo->initialize(config);
 
+  core::Relationship inputRel{"Input", "dummy"};
   std::shared_ptr<minifi::Connection> input = std::make_shared<minifi::Connection>(ff_repository, content_repo, "Input");
+  input->setRelationship(inputRel);
 
   auto root = std::make_shared<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP, "root");
   root->addConnection(input);
@@ -292,13 +293,18 @@
    */
   {
     std::shared_ptr<core::Processor> processor = std::make_shared<core::Processor>("dummy");
+    utils::Identifier uuid;
+    REQUIRE(processor->getUUID(uuid));
+    input->setSourceUUID(uuid);
+    processor->addConnection(input);
     std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
     std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
     auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, prov_repo, ff_repository, content_repo);
     core::ProcessSession sessionGenFlowFile(context);
     std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
     sessionGenFlowFile.importFrom(content, flow);
-    input->put(flow);  // stores it in the flowFileRepository
+    sessionGenFlowFile.transfer(flow, inputRel);
+    sessionGenFlowFile.commit();
   }
 
   // remove flow from the connection but it is still present in the