MINIFICPP-1340 - Batch persist content changes

Signed-off-by: Arpad Boda <aboda@apache.org>

This closes #887
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index c3042f6..2d4284d 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -22,6 +22,7 @@
 #include "RocksDbStream.h"
 #include "rocksdb/merge_operator.h"
 #include "utils/GeneralUtils.h"
+#include "Exception.h"
 
 namespace org {
 namespace apache {
@@ -65,13 +66,53 @@
   db_.reset();
 }
 
+DatabaseContentRepository::Session::Session(std::shared_ptr<ContentRepository> repository) : ContentSession(std::move(repository)) {}
+
+std::shared_ptr<ContentSession> DatabaseContentRepository::createSession() {
+  return std::make_shared<Session>(sharedFromThis());
+}
+
+void DatabaseContentRepository::Session::commit() {
+  auto dbContentRepository = std::static_pointer_cast<DatabaseContentRepository>(repository_);
+  auto opendb = dbContentRepository->db_->open();
+  if (!opendb) {
+    throw Exception(REPOSITORY_EXCEPTION, "Couldn't open rocksdb database to commit content changes");
+  }
+  rocksdb::WriteBatch batch;
+  for (const auto& resource : managedResources_) {
+    auto outStream = dbContentRepository->write(*resource.first, false, &batch);
+    if (outStream == nullptr) {
+      throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for write: " + resource.first->getContentFullPath());
+    }
+    const auto size = resource.second->getSize();
+    if (outStream->write(const_cast<uint8_t*>(resource.second->getBuffer()), size) != size) {
+      throw Exception(REPOSITORY_EXCEPTION, "Failed to write new resource: " + resource.first->getContentFullPath());
+    }
+  }
+  for (const auto& resource : extendedResources_) {
+    auto outStream = dbContentRepository->write(*resource.first, true, &batch);
+    if (outStream == nullptr) {
+      throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for append: " + resource.first->getContentFullPath());
+    }
+    const auto size = resource.second->getSize();
+    if (outStream->write(const_cast<uint8_t*>(resource.second->getBuffer()), size) != size) {
+      throw Exception(REPOSITORY_EXCEPTION, "Failed to append to resource: " + resource.first->getContentFullPath());
+    }
+  }
+
+  rocksdb::WriteOptions options;
+  options.sync = true;
+  rocksdb::Status status = opendb->Write(options, &batch);
+  if (!status.ok()) {
+    throw Exception(REPOSITORY_EXCEPTION, "Batch write failed: " + status.ToString());
+  }
+
+  managedResources_.clear();
+  extendedResources_.clear();
+}
+
 std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const minifi::ResourceClaim &claim, bool append) {
-  // the traditional approach with these has been to return -1 from the stream; however, since we have the ability here
-  // we can simply return a nullptr, which is also valid from the API when this stream is not valid.
-  if (!is_valid_ || !db_)
-    return nullptr;
-  // append is already supported in all modes
-  return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), true);
+  return write(claim, append, nullptr);
 }
 
 std::shared_ptr<io::BaseStream> DatabaseContentRepository::read(const minifi::ResourceClaim &claim) {
@@ -109,7 +150,7 @@
   rocksdb::Status status;
   status = opendb->Delete(rocksdb::WriteOptions(), claim.getContentFullPath());
   if (status.ok()) {
-    logger_->log_debug("Deleted %s", claim.getContentFullPath());
+    logger_->log_debug("Deleting resource %s", claim.getContentFullPath());
     return true;
   } else {
     logger_->log_debug("Attempted, but could not delete %s", claim.getContentFullPath());
@@ -117,6 +158,15 @@
   }
 }
 
+std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const minifi::ResourceClaim& claim, bool append, rocksdb::WriteBatch* batch) {
+  // the traditional approach with these has been to return -1 from the stream; however, since we have the ability here
+  // we can simply return a nullptr, which is also valid from the API when this stream is not valid.
+  if (!is_valid_ || !db_)
+    return nullptr;
+  // append is already supported in all modes
+  return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), true, batch);
+}
+
 } /* namespace repository */
 } /* namespace core */
 } /* namespace minifi */
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h
index 6abc476..a0bd595 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -26,6 +26,7 @@
 #include "properties/Configure.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "RocksDatabase.h"
+#include "core/ContentSession.h"
 
 namespace org {
 namespace apache {
@@ -70,6 +71,12 @@
  * DatabaseContentRepository is a content repository that stores data onto the local file system.
  */
 class DatabaseContentRepository : public core::ContentRepository, public core::Connectable {
+  class Session : public ContentSession {
+   public:
+    explicit Session(std::shared_ptr<ContentRepository> repository);
+
+    void commit() override;
+  };
  public:
 
   DatabaseContentRepository(std::string name = getClassName<DatabaseContentRepository>(), utils::Identifier uuid = utils::Identifier())
@@ -78,34 +85,36 @@
         db_(nullptr),
         logger_(logging::LoggerFactory<DatabaseContentRepository>::getLogger()) {
   }
-  virtual ~DatabaseContentRepository() {
+  ~DatabaseContentRepository() override {
     stop();
   }
 
-  virtual bool initialize(const std::shared_ptr<minifi::Configure> &configuration);
+  std::shared_ptr<ContentSession> createSession() override;
 
-  virtual void stop();
+  bool initialize(const std::shared_ptr<minifi::Configure> &configuration) override;
 
-  virtual std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append = false);
+  void stop() override;
 
-  virtual std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim &claim);
+  std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append = false) override;
 
-  virtual bool close(const minifi::ResourceClaim &claim) {
+  std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim &claim) override;
+
+  bool close(const minifi::ResourceClaim &claim) override {
     return remove(claim);
   }
 
-  virtual bool remove(const minifi::ResourceClaim &claim);
+  bool remove(const minifi::ResourceClaim &claim) override;
 
-  virtual bool exists(const minifi::ResourceClaim &streamId);
+  bool exists(const minifi::ResourceClaim &streamId) override;
 
-  virtual void yield() {
+  void yield() override {
 
   }
 
   /**
    * Determines if we are connected and operating
    */
-  virtual bool isRunning() {
+  bool isRunning() override {
     return true;
   }
 
@@ -113,11 +122,13 @@
    * Determines if work is available by this connectable
    * @return boolean if work is available.
    */
-  virtual bool isWorkAvailable() {
+  bool isWorkAvailable() override {
     return true;
   }
 
  private:
+  std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append, rocksdb::WriteBatch* batch);
+
   bool is_valid_;
   std::unique_ptr<minifi::internal::RocksDatabase> db_;
   std::shared_ptr<logging::Logger> logger_;
diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp b/extensions/rocksdb-repos/RocksDbStream.cpp
index decb030..8fe6178 100644
--- a/extensions/rocksdb-repos/RocksDbStream.cpp
+++ b/extensions/rocksdb-repos/RocksDbStream.cpp
@@ -30,11 +30,12 @@
 namespace minifi {
 namespace io {
 
-RocksDbStream::RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable)
+RocksDbStream::RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable, rocksdb::WriteBatch* batch)
     : BaseStream(),
       path_(std::move(path)),
       write_enable_(write_enable),
-      db_(std::move(db)),
+      db_(db),
+      batch_(batch),
       logger_(logging::LoggerFactory<RocksDbStream>::getLogger()) {
   auto opendb = db_->open();
   exists_ = opendb && opendb->Get(rocksdb::ReadOptions(), path_, &value_).ok();
@@ -71,11 +72,15 @@
     rocksdb::Slice slice_value((const char *) value, size);
     rocksdb::Status status;
     size_ += size;
-    rocksdb::WriteOptions opts;
-    opts.sync = true;
-    opendb->Merge(opts, path_, slice_value);
+    if (batch_ != nullptr) {
+      status = batch_->Merge(path_, slice_value);
+    } else {
+      rocksdb::WriteOptions opts;
+      opts.sync = true;
+      status = opendb->Merge(opts, path_, slice_value);
+    }
     if (status.ok()) {
-      return 0;
+      return size;
     } else {
       return -1;
     }
diff --git a/extensions/rocksdb-repos/RocksDbStream.h b/extensions/rocksdb-repos/RocksDbStream.h
index c62d27b..a762753 100644
--- a/extensions/rocksdb-repos/RocksDbStream.h
+++ b/extensions/rocksdb-repos/RocksDbStream.h
@@ -46,7 +46,7 @@
    * File Stream constructor that accepts an fstream shared pointer.
    * It must already be initialized for read and write.
    */
-  explicit RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable = false);
+  explicit RocksDbStream(std::string path, gsl::not_null<minifi::internal::RocksDatabase*> db, bool write_enable = false, rocksdb::WriteBatch* batch = nullptr);
 
   ~RocksDbStream() override {
     closeStream();
@@ -162,6 +162,8 @@
 
   gsl::not_null<minifi::internal::RocksDatabase*> db_;
 
+  rocksdb::WriteBatch* batch_;
+
   size_t size_;
 
  private:
diff --git a/libminifi/include/Exception.h b/libminifi/include/Exception.h
index a2b0f89..ef85076 100644
--- a/libminifi/include/Exception.h
+++ b/libminifi/include/Exception.h
@@ -44,11 +44,12 @@
   SITE2SITE_EXCEPTION,
   GENERAL_EXCEPTION,
   REGEX_EXCEPTION,
+  REPOSITORY_EXCEPTION,
   MAX_EXCEPTION
 };
 
 static const char *ExceptionStr[MAX_EXCEPTION] = { "File Operation", "Flow File Operation", "Processor Operation", "Process Session Operation", "Process Schedule Operation", "Site2Site Protocol",
-    "General Operation", "Regex Operation" };
+    "General Operation", "Regex Operation", "Repository Operation" };
 
 inline const char *ExceptionTypeToString(ExceptionType type) {
   if (type < MAX_EXCEPTION)
diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h
index 2b9bbae..68c44fd 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -28,6 +28,8 @@
 #include "io/BaseStream.h"
 #include "StreamManager.h"
 #include "core/Connectable.h"
+#include "ContentSession.h"
+#include "utils/GeneralUtils.h"
 
 namespace org {
 namespace apache {
@@ -38,7 +40,7 @@
 /**
  * Content repository definition that extends StreamManager.
  */
-class ContentRepository : public StreamManager<minifi::ResourceClaim> {
+class ContentRepository : public StreamManager<minifi::ResourceClaim>, public utils::EnableSharedFromThis<ContentRepository> {
  public:
   virtual ~ContentRepository() = default;
 
@@ -47,54 +49,22 @@
    */
   virtual bool initialize(const std::shared_ptr<Configure> &configure) = 0;
 
-  virtual std::string getStoragePath() const {
-    return directory_;
-  }
+  virtual std::string getStoragePath() const;
+
+  virtual std::shared_ptr<ContentSession> createSession();
 
   /**
    * Stops this repository.
    */
   virtual void stop() = 0;
 
-  void reset() {
-    std::lock_guard<std::mutex> lock(count_map_mutex_);
-    count_map_.clear();
-  }
+  void reset();
 
-  virtual uint32_t getStreamCount(const minifi::ResourceClaim &streamId) {
-    std::lock_guard<std::mutex> lock(count_map_mutex_);
-    auto cnt = count_map_.find(streamId.getContentFullPath());
-    if (cnt != count_map_.end()) {
-      return cnt->second;
-    } else {
-      return 0;
-    }
-  }
+  virtual uint32_t getStreamCount(const minifi::ResourceClaim &streamId);
 
-  virtual void incrementStreamCount(const minifi::ResourceClaim &streamId) {
-    std::lock_guard<std::mutex> lock(count_map_mutex_);
-    const std::string str = streamId.getContentFullPath();
-    auto count = count_map_.find(str);
-    if (count != count_map_.end()) {
-      count_map_[str] = count->second + 1;
-    } else {
-      count_map_[str] = 1;
-    }
-  }
+  virtual void incrementStreamCount(const minifi::ResourceClaim &streamId);
 
-  virtual StreamState decrementStreamCount(const minifi::ResourceClaim &streamId) {
-    std::lock_guard<std::mutex> lock(count_map_mutex_);
-    const std::string str = streamId.getContentFullPath();
-    auto count = count_map_.find(str);
-    if (count != count_map_.end() && count->second > 1) {
-      count_map_[str] = count->second - 1;
-      return StreamState::Alive;
-    } else {
-      count_map_.erase(str);
-      remove(streamId);
-      return StreamState::Deleted;
-    }
-  }
+  virtual StreamState decrementStreamCount(const minifi::ResourceClaim &streamId);
 
  protected:
   std::string directory_;
diff --git a/libminifi/include/core/ContentSession.h b/libminifi/include/core/ContentSession.h
new file mode 100644
index 0000000..a7144f3
--- /dev/null
+++ b/libminifi/include/core/ContentSession.h
@@ -0,0 +1,66 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include "ResourceClaim.h"
+#include "io/BaseStream.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+class ContentRepository;
+
+class ContentSession {
+ public:
+  enum class WriteMode {
+    OVERWRITE,
+    APPEND
+  };
+
+  explicit ContentSession(std::shared_ptr<ContentRepository> repository);
+
+  std::shared_ptr<ResourceClaim> create();
+
+  std::shared_ptr<io::BaseStream> write(const std::shared_ptr<ResourceClaim>& resourceId, WriteMode mode = WriteMode::OVERWRITE);
+
+  std::shared_ptr<io::BaseStream> read(const std::shared_ptr<ResourceClaim>& resourceId);
+
+  virtual void commit();
+
+  void rollback();
+
+  virtual ~ContentSession() = default;
+
+ protected:
+  std::map<std::shared_ptr<ResourceClaim>, std::shared_ptr<io::BaseStream>> managedResources_;
+  std::map<std::shared_ptr<ResourceClaim>, std::shared_ptr<io::BaseStream>> extendedResources_;
+  std::shared_ptr<ContentRepository> repository_;
+};
+
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
+
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index e6e4f23..7d0f535 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -57,6 +57,7 @@
     logger_->log_trace("ProcessSession created for %s", process_context_->getProcessorNode()->getName());
     auto repo = process_context_->getProvenanceRepository();
     provenance_report_ = std::make_shared<provenance::ProvenanceReporter>(repo, process_context_->getProcessorNode()->getName(), process_context_->getProcessorNode()->getName());
+    content_session_ = process_context_->getContentRepository()->createSession();
   }
 
   // Destructor
@@ -70,6 +71,9 @@
   std::shared_ptr<provenance::ProvenanceReporter> getProvenanceReporter() {
     return provenance_report_;
   }
+  // writes the created contents to the underlying repository
+  void flushContent();
+
   // Get the FlowFile from the highest priority queue
   virtual std::shared_ptr<core::FlowFile> get();
   // Create a new UUID FlowFile with no content resource claim and without parent
@@ -165,6 +169,8 @@
   // Provenance Report
   std::shared_ptr<provenance::ProvenanceReporter> provenance_report_;
 
+  std::shared_ptr<ContentSession> content_session_;
+
   static std::shared_ptr<utils::IdGenerator> id_generator_;
 };
 
diff --git a/libminifi/include/core/repository/VolatileContentRepository.h b/libminifi/include/core/repository/VolatileContentRepository.h
index 03ece73..445f482 100644
--- a/libminifi/include/core/repository/VolatileContentRepository.h
+++ b/libminifi/include/core/repository/VolatileContentRepository.h
@@ -30,6 +30,8 @@
 #include "properties/Configure.h"
 #include "core/Connectable.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "utils/GeneralUtils.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -41,7 +43,12 @@
  * Purpose: Stages content into a volatile area of memory. Note that   when the maximum number
  * of entries is consumed we will rollback a session to wait for others to be freed.
  */
-class VolatileContentRepository : public core::ContentRepository, public virtual core::repository::VolatileRepository<ResourceClaim::Path> {
+class VolatileContentRepository :
+    public core::ContentRepository,
+    public core::repository::VolatileRepository<ResourceClaim::Path>,
+    public utils::EnableSharedFromThis<VolatileContentRepository> {
+  using utils::EnableSharedFromThis<VolatileContentRepository>::sharedFromThis;
+
  public:
   static const char *minimal_locking;
 
@@ -109,11 +116,6 @@
 
   virtual void run();
 
-  template<typename T2>
-  std::shared_ptr<T2> shared_from_parent() {
-    return std::dynamic_pointer_cast<T2>(shared_from_this());
-  }
-
  private:
   bool minimize_locking_;
 
diff --git a/libminifi/include/core/repository/VolatileFlowFileRepository.h b/libminifi/include/core/repository/VolatileFlowFileRepository.h
index 1c65f0e..f826fee 100644
--- a/libminifi/include/core/repository/VolatileFlowFileRepository.h
+++ b/libminifi/include/core/repository/VolatileFlowFileRepository.h
@@ -35,7 +35,9 @@
  * Volatile flow file repository. keeps a running counter of the current location, freeing
  * those which we no longer hold.
  */
-class VolatileFlowFileRepository : public VolatileRepository<std::string> {
+class VolatileFlowFileRepository : public VolatileRepository<std::string>, public utils::EnableSharedFromThis<VolatileFlowFileRepository> {
+  using utils::EnableSharedFromThis<VolatileFlowFileRepository>::sharedFromThis;
+
  public:
   explicit VolatileFlowFileRepository(std::string repo_name = "", std::string dir = REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME, int64_t maxPartitionBytes =
   MAX_REPOSITORY_STORAGE_SIZE,
@@ -59,7 +61,7 @@
     if (purge_required_ && nullptr != content_repo_) {
       std::lock_guard<std::mutex> lock(purge_mutex_);
       for (auto purgeItem : purge_list_) {
-        std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
+        std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(sharedFromThis(), content_repo_);
         if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(purgeItem.data()), purgeItem.size())) {
           auto claim = eventRead->getResourceClaim();
           if (claim) claim->decreaseFlowFileRecordOwnedCount();
diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h
index 31c0f87..a5b395a 100644
--- a/libminifi/include/core/repository/VolatileRepository.h
+++ b/libminifi/include/core/repository/VolatileRepository.h
@@ -51,7 +51,9 @@
  * Design: Extends Repository and implements the run function, using RocksDB as the primary substrate.
  */
 template<typename T>
-class VolatileRepository : public core::Repository, public std::enable_shared_from_this<VolatileRepository<T>> {
+class VolatileRepository : public core::Repository, public utils::EnableSharedFromThis<VolatileRepository<T>> {
+  using utils::EnableSharedFromThis<VolatileRepository<T>>::sharedFromThis;
+
  public:
   static const char *volatile_repo_max_count;
   static const char *volatile_repo_max_bytes;
@@ -404,7 +406,7 @@
   if (running_)
     return;
   running_ = true;
-  thread_ = std::thread(&VolatileRepository<T>::run, std::enable_shared_from_this<VolatileRepository<T>>::shared_from_this());
+  thread_ = std::thread(&VolatileRepository<T>::run, sharedFromThis());
   logger_->log_debug("%s Repository Monitor Thread Start", name_);
 }
 #if defined(__clang__)
diff --git a/libminifi/include/io/AtomicEntryStream.h b/libminifi/include/io/AtomicEntryStream.h
index 1884a62..c772cd3 100644
--- a/libminifi/include/io/AtomicEntryStream.h
+++ b/libminifi/include/io/AtomicEntryStream.h
@@ -141,6 +141,9 @@
 // data stream overrides
 template<typename T>
 int AtomicEntryStream<T>::writeData(uint8_t *value, int size) {
+  if (size == 0) {
+    return 0;
+  }
   if (nullptr != value && !invalid_stream_) {
     std::lock_guard<std::recursive_mutex> lock(entry_lock_);
     if (entry_->insert(key_, value, size)) {
@@ -176,6 +179,9 @@
 
 template<typename T>
 int AtomicEntryStream<T>::readData(uint8_t *buf, int buflen) {
+  if (buflen == 0) {
+    return 0;
+  }
   if (nullptr != buf && !invalid_stream_) {
     std::lock_guard<std::recursive_mutex> lock(entry_lock_);
     int len = buflen;
diff --git a/libminifi/include/utils/GeneralUtils.h b/libminifi/include/utils/GeneralUtils.h
index 2d45724..ee91627 100644
--- a/libminifi/include/utils/GeneralUtils.h
+++ b/libminifi/include/utils/GeneralUtils.h
@@ -68,6 +68,30 @@
 using std::void_t;
 #endif /* < C++17 */
 
+namespace internal {
+
+/*
+ * We need this base class to enable safe multiple inheritance
+ * from std::enable_shared_from_this, it also needs to be polymorphic
+ * to allow dynamic_cast to the derived class.
+ */
+struct EnableSharedFromThisBase : std::enable_shared_from_this<EnableSharedFromThisBase> {
+  virtual ~EnableSharedFromThisBase() = default;
+};
+
+}  // namespace internal
+
+/*
+ * The virtual inheritance ensures that there is only a single
+ * std::weak_ptr instance in each instance.
+ */
+template<typename T>
+struct EnableSharedFromThis : virtual internal::EnableSharedFromThisBase {
+  std::shared_ptr<T> sharedFromThis() {
+    return std::dynamic_pointer_cast<T>(internal::EnableSharedFromThisBase::shared_from_this());
+  }
+};
+
 }  // namespace utils
 }  // namespace minifi
 }  // namespace nifi
diff --git a/libminifi/src/core/ContentRepository.cpp b/libminifi/src/core/ContentRepository.cpp
new file mode 100644
index 0000000..c4d06ed
--- /dev/null
+++ b/libminifi/src/core/ContentRepository.cpp
@@ -0,0 +1,84 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <map>
+#include <memory>
+#include <string>
+
+#include "core/ContentRepository.h"
+#include "core/ContentSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+std::string ContentRepository::getStoragePath() const {
+  return directory_;
+}
+
+void ContentRepository::reset() {
+  std::lock_guard<std::mutex> lock(count_map_mutex_);
+  count_map_.clear();
+}
+
+std::shared_ptr<ContentSession> ContentRepository::createSession() {
+  return std::make_shared<ContentSession>(sharedFromThis());
+}
+
+uint32_t ContentRepository::getStreamCount(const minifi::ResourceClaim &streamId) {
+  std::lock_guard<std::mutex> lock(count_map_mutex_);
+  auto cnt = count_map_.find(streamId.getContentFullPath());
+  if (cnt != count_map_.end()) {
+    return cnt->second;
+  } else {
+    return 0;
+  }
+}
+
+void ContentRepository::incrementStreamCount(const minifi::ResourceClaim &streamId) {
+  std::lock_guard<std::mutex> lock(count_map_mutex_);
+  const std::string str = streamId.getContentFullPath();
+  auto count = count_map_.find(str);
+  if (count != count_map_.end()) {
+    count_map_[str] = count->second + 1;
+  } else {
+    count_map_[str] = 1;
+  }
+}
+
+ContentRepository::StreamState ContentRepository::decrementStreamCount(const minifi::ResourceClaim &streamId) {
+  std::lock_guard<std::mutex> lock(count_map_mutex_);
+  const std::string str = streamId.getContentFullPath();
+  auto count = count_map_.find(str);
+  if (count != count_map_.end() && count->second > 1) {
+    count_map_[str] = count->second - 1;
+    return StreamState::Alive;
+  } else {
+    count_map_.erase(str);
+    remove(streamId);
+    return StreamState::Deleted;
+  }
+}
+
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/src/core/ContentSession.cpp b/libminifi/src/core/ContentSession.cpp
new file mode 100644
index 0000000..f5934f9
--- /dev/null
+++ b/libminifi/src/core/ContentSession.cpp
@@ -0,0 +1,104 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include "core/ContentRepository.h"
+#include "core/ContentSession.h"
+#include "ResourceClaim.h"
+#include "io/BaseStream.h"
+#include "Exception.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+ContentSession::ContentSession(std::shared_ptr<ContentRepository> repository) : repository_(std::move(repository)) {}
+
+std::shared_ptr<ResourceClaim> ContentSession::create() {
+  std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(repository_);
+  managedResources_[claim] = std::make_shared<io::BaseStream>();
+  return claim;
+}
+
+std::shared_ptr<io::BaseStream> ContentSession::write(const std::shared_ptr<ResourceClaim>& resourceId, WriteMode mode) {
+  auto it = managedResources_.find(resourceId);
+  if (it == managedResources_.end()) {
+    if (mode == WriteMode::OVERWRITE) {
+      throw Exception(REPOSITORY_EXCEPTION, "Can only overwrite owned resource");
+    }
+    auto& extension = extendedResources_[resourceId];
+    if (!extension) {
+      extension = std::make_shared<io::BaseStream>();
+    }
+    return extension;
+  }
+  if (mode == WriteMode::OVERWRITE) {
+    it->second = std::make_shared<io::BaseStream>();
+  }
+  return it->second;
+}
+
+std::shared_ptr<io::BaseStream> ContentSession::read(const std::shared_ptr<ResourceClaim>& resourceId) {
+  // TODO(adebreceni):
+  //  after the stream refactor is merged we should be able to share the underlying buffer
+  //  between multiple InputStreams, moreover create a ConcatInputStream
+  if (managedResources_.find(resourceId) != managedResources_.end() || extendedResources_.find(resourceId) != extendedResources_.end()) {
+    throw Exception(REPOSITORY_EXCEPTION, "Can only read non-modified resource");
+  }
+  return repository_->read(*resourceId);
+}
+
+void ContentSession::commit() {
+  for (const auto& resource : managedResources_) {
+    auto outStream = repository_->write(*resource.first);
+    if (outStream == nullptr) {
+      throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for write: " + resource.first->getContentFullPath());
+    }
+    const auto size = resource.second->getSize();
+    if (outStream->write(const_cast<uint8_t*>(resource.second->getBuffer()), size) != size) {
+      throw Exception(REPOSITORY_EXCEPTION, "Failed to write new resource: " + resource.first->getContentFullPath());
+    }
+  }
+  for (const auto& resource : extendedResources_) {
+    auto outStream = repository_->write(*resource.first, true);
+    if (outStream == nullptr) {
+      throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for append: " + resource.first->getContentFullPath());
+    }
+    const auto size = resource.second->getSize();
+    if (outStream->write(const_cast<uint8_t*>(resource.second->getBuffer()), size) != size) {
+      throw Exception(REPOSITORY_EXCEPTION, "Failed to append to resource: " + resource.first->getContentFullPath());
+    }
+  }
+
+  managedResources_.clear();
+  extendedResources_.clear();
+}
+
+void ContentSession::rollback() {
+  managedResources_.clear();
+  extendedResources_.clear();
+}
+
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
+
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index b0481a5..20cc9ea 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -241,11 +241,11 @@
 }
 
 void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
-  std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+  std::shared_ptr<ResourceClaim> claim = content_session_->create();
 
   try {
     uint64_t startTime = utils::timeutils::getTimeMillis();
-    std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(*claim);
+    std::shared_ptr<io::BaseStream> stream = content_session_->write(claim);
     // Call the callback to write the content
     if (nullptr == stream) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for write");
@@ -280,7 +280,7 @@
 
   try {
     uint64_t startTime = utils::timeutils::getTimeMillis();
-    std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(*claim, true);
+    std::shared_ptr<io::BaseStream> stream = content_session_->write(claim, ContentSession::WriteMode::APPEND);
     if (nullptr == stream) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for append");
     }
@@ -323,7 +323,7 @@
 
     claim = flow->getResourceClaim();
 
-    std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->read(*claim);
+    std::shared_ptr<io::BaseStream> stream = content_session_->read(claim);
 
     if (nullptr == stream) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for read");
@@ -350,13 +350,13 @@
  *
  */
 void ProcessSession::importFrom(io::DataStream &stream, const std::shared_ptr<core::FlowFile> &flow) {
-  std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+  std::shared_ptr<ResourceClaim> claim = content_session_->create();
   size_t max_read = getpagesize();
   std::vector<uint8_t> charBuffer(max_read);
 
   try {
     auto startTime = utils::timeutils::getTimeMillis();
-    std::shared_ptr<io::BaseStream> content_stream = process_context_->getContentRepository()->write(*claim);
+    std::shared_ptr<io::BaseStream> content_stream = content_session_->write(claim);
 
     if (nullptr == content_stream) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Could not obtain claim for " + claim->getContentFullPath());
@@ -394,7 +394,7 @@
 }
 
 void ProcessSession::import(std::string source, const std::shared_ptr<core::FlowFile> &flow, bool keepSource, uint64_t offset) {
-  std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+  std::shared_ptr<ResourceClaim> claim = content_session_->create();
   size_t size = getpagesize();
   std::vector<uint8_t> charBuffer(size);
 
@@ -402,7 +402,8 @@
     auto startTime = utils::timeutils::getTimeMillis();
     std::ifstream input;
     input.open(source.c_str(), std::fstream::in | std::fstream::binary);
-    std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(*claim);
+    std::shared_ptr<io::BaseStream> stream = content_session_->write(claim);
+
     if (nullptr == stream) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open new flowfile content for write");
     }
@@ -516,10 +517,10 @@
         /* Create claim and stream if needed and append data */
         if (claim == nullptr) {
           startTime = utils::timeutils::getTimeMillis();
-          claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+          claim = content_session_->create();
         }
         if (stream == nullptr) {
-          stream = process_context_->getContentRepository()->write(*claim);
+          stream = content_session_->write(claim);
         }
         if (stream == nullptr) {
           logger_->log_error("Stream is null");
@@ -799,6 +800,8 @@
         }
     }
 
+    content_session_->commit();
+
     persistFlowFilesBeforeTransfer(connectionQueues, _flowFileSnapShots);
 
     for (auto& cq : connectionQueues) {
@@ -869,6 +872,8 @@
       }
     }
 
+    content_session_->rollback();
+
     _flowFileSnapShots.clear();
 
     _clonedFlowFiles.clear();
@@ -997,6 +1002,10 @@
   return nullptr;
 }
 
+void ProcessSession::flushContent() {
+  content_session_->commit();
+}
+
 bool ProcessSession::outgoingConnectionsFull(const std::string& relationship) {
   std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode()->getOutGoingConnections(relationship);
   Connection * connection = nullptr;
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp
index d7039cf..b015f59 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -72,7 +72,7 @@
     return;
   if (running_)
     return;
-  thread_ = std::thread(&VolatileContentRepository::run, shared_from_parent<VolatileContentRepository>());
+  thread_ = std::thread(&VolatileContentRepository::run, sharedFromThis());
   thread_.detach();
   running_ = true;
   logger_->log_info("%s Repository Monitor Thread Start", getName());
diff --git a/libminifi/src/io/DataStream.cpp b/libminifi/src/io/DataStream.cpp
index e3bdf08..8912f7b 100644
--- a/libminifi/src/io/DataStream.cpp
+++ b/libminifi/src/io/DataStream.cpp
@@ -33,7 +33,9 @@
 int DataStream::writeData(uint8_t *value, int size) {
   if (value == nullptr)
     return 0;
-  std::copy(value, value + size, std::back_inserter(buffer));
+  std::size_t previous_size = buffer.size();
+  buffer.resize(previous_size + size);
+  std::memcpy(buffer.data() + previous_size, value, size);
   return size;
 }
 
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp
index fe6e036..13d1707 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -119,32 +119,31 @@
 
     std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
 
-    processor_ = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
-    processor_->initialize();
+    processor = std::make_shared<org::apache::nifi::minifi::processors::CompressContent>("compresscontent");
+    processor->initialize();
     utils::Identifier processoruuid;
-    REQUIRE(true == processor_->getUUID(processoruuid));
+    REQUIRE(true == processor->getUUID(processoruuid));
 
     std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
     content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
     // connection from compress processor to log attribute
-    output_ = std::make_shared<minifi::Connection>(repo, content_repo, "Output");
-    output_->addRelationship(core::Relationship("success", "compress successful output"));
-    output_->setSource(processor_);
-    output_->setSourceUUID(processoruuid);
-    processor_->addConnection(output_);
+    output = std::make_shared<minifi::Connection>(repo, content_repo, "Output");
+    output->addRelationship(core::Relationship("success", "compress successful output"));
+    output->setSource(processor);
+    output->setSourceUUID(processoruuid);
+    processor->addConnection(output);
     // connection to compress processor
-    input_ = std::make_shared<minifi::Connection>(repo, content_repo, "Input");
-    input_->setDestination(processor_);
-    input_->setDestinationUUID(processoruuid);
-    processor_->addConnection(input_);
+    input = std::make_shared<minifi::Connection>(repo, content_repo, "Input");
+    input->setDestination(processor);
+    input->setDestinationUUID(processoruuid);
+    processor->addConnection(input);
 
-    processor_->setAutoTerminatedRelationships({{"failure", ""}});
+    processor->setAutoTerminatedRelationships({{"failure", ""}});
 
-    processor_->incrementActiveTasks();
-    processor_->setScheduledState(core::ScheduledState::RUNNING);
+    processor->incrementActiveTasks();
+    processor->setScheduledState(core::ScheduledState::RUNNING);
 
-    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor_);
-    context_ = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
+    context = std::make_shared<core::ProcessContext>(std::make_shared<core::ProcessorNode>(processor), nullptr, repo, repo, content_repo);
   }
 
  public:
@@ -178,13 +177,15 @@
 
   virtual ~CompressDecompressionTestController() = 0;
 
-  std::shared_ptr<core::Processor> processor_;
-  std::shared_ptr<core::ProcessContext> context_;
-  std::shared_ptr<minifi::Connection> output_;
-  std::shared_ptr<minifi::Connection> input_;
+  std::shared_ptr<core::Processor> processor;
+  std::shared_ptr<core::ProcessContext> context;
+  std::shared_ptr<minifi::Connection> output;
+  std::shared_ptr<minifi::Connection> input;
 };
 
-CompressDecompressionTestController::~CompressDecompressionTestController() = default;
+CompressDecompressionTestController::~CompressDecompressionTestController() {
+  LogTestController::getInstance().reset();
+}
 
 std::string CompressDecompressionTestController::tempDir_;
 std::string CompressDecompressionTestController::raw_content_path_;
@@ -233,13 +234,7 @@
   }
 };
 
-TEST_CASE("CompressFileGZip", "[compressfiletest1]") {
-  CompressTestController testController;
-  auto context = testController.context_;
-  auto input = testController.input_;
-  auto processor = testController.processor_;
-  auto output = testController.output_;
-
+TEST_CASE_METHOD(CompressTestController, "CompressFileGZip", "[compressfiletest1]") {
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_GZIP);
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
@@ -247,7 +242,8 @@
 
   core::ProcessSession sessionGenFlowFile(context);
   std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-  sessionGenFlowFile.import(testController.rawContentPath(), flow, true, 0);
+  sessionGenFlowFile.import(rawContentPath(), flow, true, 0);
+  sessionGenFlowFile.flushContent();
   input->put(flow);
 
   REQUIRE(processor->getName() == "compresscontent");
@@ -270,20 +266,13 @@
     sessionGenFlowFile.read(flow1, &callback);
     callback.archive_read();
     std::string content(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
-    REQUIRE(testController.getRawContent() == content);
+    REQUIRE(getRawContent() == content);
     // write the compress content for next test
-    testController.writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+    writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
   }
-  LogTestController::getInstance().reset();
 }
 
-TEST_CASE("DecompressFileGZip", "[compressfiletest2]") {
-  DecompressTestController testController;
-  auto context = testController.context_;
-  auto input = testController.input_;
-  auto processor = testController.processor_;
-  auto output = testController.output_;
-
+TEST_CASE_METHOD(DecompressTestController, "DecompressFileGZip", "[compressfiletest2]") {
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_GZIP);
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
@@ -291,7 +280,8 @@
 
   core::ProcessSession sessionGenFlowFile(context);
   std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-  sessionGenFlowFile.import(testController.compressedPath(), flow, true, 0);
+  sessionGenFlowFile.import(compressedPath(), flow, true, 0);
+  sessionGenFlowFile.flushContent();
   input->put(flow);
 
   REQUIRE(processor->getName() == "compresscontent");
@@ -312,18 +302,11 @@
     ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
     sessionGenFlowFile.read(flow1, &callback);
     std::string content(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-    REQUIRE(testController.getRawContent() == content);
+    REQUIRE(getRawContent() == content);
   }
-  LogTestController::getInstance().reset();
 }
 
-TEST_CASE("CompressFileBZip", "[compressfiletest3]") {
-  CompressTestController testController;
-  auto context = testController.context_;
-  auto input = testController.input_;
-  auto processor = testController.processor_;
-  auto output = testController.output_;
-
+TEST_CASE_METHOD(CompressTestController, "CompressFileBZip", "[compressfiletest3]") {
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_BZIP2);
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
@@ -331,7 +314,8 @@
 
   core::ProcessSession sessionGenFlowFile(context);
   std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-  sessionGenFlowFile.import(testController.rawContentPath(), flow, true, 0);
+  sessionGenFlowFile.import(rawContentPath(), flow, true, 0);
+  sessionGenFlowFile.flushContent();
   input->put(flow);
 
   REQUIRE(processor->getName() == "compresscontent");
@@ -354,21 +338,14 @@
     sessionGenFlowFile.read(flow1, &callback);
     callback.archive_read();
     std::string contents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
-    REQUIRE(testController.getRawContent() == contents);
+    REQUIRE(getRawContent() == contents);
     // write the compress content for next test
-    testController.writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+    writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
   }
-  LogTestController::getInstance().reset();
 }
 
 
-TEST_CASE("DecompressFileBZip", "[compressfiletest4]") {
-  DecompressTestController testController;
-  auto context = testController.context_;
-  auto input = testController.input_;
-  auto processor = testController.processor_;
-  auto output = testController.output_;
-
+TEST_CASE_METHOD(DecompressTestController, "DecompressFileBZip", "[compressfiletest4]") {
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_BZIP2);
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
@@ -376,7 +353,8 @@
 
   core::ProcessSession sessionGenFlowFile(context);
   std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-  sessionGenFlowFile.import(testController.compressedPath(), flow, true, 0);
+  sessionGenFlowFile.import(compressedPath(), flow, true, 0);
+  sessionGenFlowFile.flushContent();
   input->put(flow);
 
   REQUIRE(processor->getName() == "compresscontent");
@@ -397,18 +375,11 @@
     ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
     sessionGenFlowFile.read(flow1, &callback);
     std::string contents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-    REQUIRE(testController.getRawContent() == contents);
+    REQUIRE(getRawContent() == contents);
   }
-  LogTestController::getInstance().reset();
 }
 
-TEST_CASE("CompressFileLZMA", "[compressfiletest5]") {
-  CompressTestController testController;
-  auto context = testController.context_;
-  auto input = testController.input_;
-  auto processor = testController.processor_;
-  auto output = testController.output_;
-
+TEST_CASE_METHOD(CompressTestController, "CompressFileLZMA", "[compressfiletest5]") {
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_LZMA);
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
@@ -416,7 +387,8 @@
 
   core::ProcessSession sessionGenFlowFile(context);
   std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-  sessionGenFlowFile.import(testController.rawContentPath(), flow, true, 0);
+  sessionGenFlowFile.import(rawContentPath(), flow, true, 0);
+  sessionGenFlowFile.flushContent();
   input->put(flow);
 
   REQUIRE(processor->getName() == "compresscontent");
@@ -445,21 +417,14 @@
     sessionGenFlowFile.read(flow1, &callback);
     callback.archive_read();
     std::string contents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
-    REQUIRE(testController.getRawContent() == contents);
+    REQUIRE(getRawContent() == contents);
     // write the compress content for next test
-    testController.writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+    writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
   }
-  LogTestController::getInstance().reset();
 }
 
 
-TEST_CASE("DecompressFileLZMA", "[compressfiletest6]") {
-  DecompressTestController testController;
-  auto context = testController.context_;
-  auto input = testController.input_;
-  auto processor = testController.processor_;
-  auto output = testController.output_;
-
+TEST_CASE_METHOD(DecompressTestController, "DecompressFileLZMA", "[compressfiletest6]") {
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_ATTRIBUTE);
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
@@ -467,8 +432,9 @@
 
   core::ProcessSession sessionGenFlowFile(context);
   std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-  sessionGenFlowFile.import(testController.compressedPath(), flow, true, 0);
+  sessionGenFlowFile.import(compressedPath(), flow, true, 0);
   flow->setAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), "application/x-lzma");
+  sessionGenFlowFile.flushContent();
   input->put(flow);
 
   REQUIRE(processor->getName() == "compresscontent");
@@ -495,18 +461,11 @@
     ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
     sessionGenFlowFile.read(flow1, &callback);
     std::string contents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-    REQUIRE(testController.getRawContent() == contents);
+    REQUIRE(getRawContent() == contents);
   }
-  LogTestController::getInstance().reset();
 }
 
-TEST_CASE("CompressFileXYLZMA", "[compressfiletest7]") {
-  CompressTestController testController;
-  auto context = testController.context_;
-  auto input = testController.input_;
-  auto processor = testController.processor_;
-  auto output = testController.output_;
-
+TEST_CASE_METHOD(CompressTestController, "CompressFileXYLZMA", "[compressfiletest7]") {
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_COMPRESS);
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_XZ_LZMA2);
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
@@ -514,7 +473,8 @@
 
   core::ProcessSession sessionGenFlowFile(context);
   std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-  sessionGenFlowFile.import(testController.rawContentPath(), flow, true, 0);
+  sessionGenFlowFile.import(rawContentPath(), flow, true, 0);
+  sessionGenFlowFile.flushContent();
   input->put(flow);
 
   REQUIRE(processor->getName() == "compresscontent");
@@ -543,21 +503,14 @@
     sessionGenFlowFile.read(flow1, &callback);
     callback.archive_read();
     std::string contents(reinterpret_cast<char *> (callback.archive_buffer_), callback.archive_buffer_size_);
-    REQUIRE(testController.getRawContent() == contents);
+    REQUIRE(getRawContent() == contents);
     // write the compress content for next test
-    testController.writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
+    writeCompressed(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
   }
-  LogTestController::getInstance().reset();
 }
 
 
-TEST_CASE("DecompressFileXYLZMA", "[compressfiletest8]") {
-  DecompressTestController testController;
-  auto context = testController.context_;
-  auto input = testController.input_;
-  auto processor = testController.processor_;
-  auto output = testController.output_;
-
+TEST_CASE_METHOD(DecompressTestController, "DecompressFileXYLZMA", "[compressfiletest8]") {
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressMode, MODE_DECOMPRESS);
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressFormat, COMPRESSION_FORMAT_ATTRIBUTE);
   context->setProperty(org::apache::nifi::minifi::processors::CompressContent::CompressLevel, "9");
@@ -565,8 +518,9 @@
 
   core::ProcessSession sessionGenFlowFile(context);
   std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < core::FlowFile > (sessionGenFlowFile.create());
-  sessionGenFlowFile.import(testController.compressedPath(), flow, true, 0);
+  sessionGenFlowFile.import(compressedPath(), flow, true, 0);
   flow->setAttribute(FlowAttributeKey(org::apache::nifi::minifi::MIME_TYPE), "application/x-xz");
+  sessionGenFlowFile.flushContent();
   input->put(flow);
 
   REQUIRE(processor->getName() == "compresscontent");
@@ -593,23 +547,21 @@
     ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
     sessionGenFlowFile.read(flow1, &callback);
     std::string contents(reinterpret_cast<char *> (callback.buffer_), callback.read_size_);
-    REQUIRE(testController.getRawContent() == contents);
+    REQUIRE(getRawContent() == contents);
   }
-  LogTestController::getInstance().reset();
 }
 
-TEST_CASE("RawGzipCompressionDecompression", "[compressfiletest8]") {
-  TestController testController;
+TEST_CASE_METHOD(TestController, "RawGzipCompressionDecompression", "[compressfiletest8]") {
   LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::CompressContent>();
   LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::PutFile>();
 
   // Create temporary directories
   char format_src[] = "/tmp/archives.XXXXXX";
-  std::string src_dir = testController.createTempDirectory(format_src);
+  std::string src_dir = createTempDirectory(format_src);
   REQUIRE(!src_dir.empty());
 
   char format_dst[] = "/tmp/archived.XXXXXX";
-  std::string dst_dir = testController.createTempDirectory(format_dst);
+  std::string dst_dir = createTempDirectory(format_dst);
   REQUIRE(!dst_dir.empty());
 
   // Define files
@@ -618,7 +570,7 @@
   std::string decompressed_file = utils::file::FileUtils::concat_path(dst_dir, "src.txt");
 
   // Build MiNiFi processing graph
-  auto plan = testController.createPlan();
+  auto plan = createPlan();
   auto get_file = plan->addProcessor(
       "GetFile",
       "GetFile");
@@ -684,7 +636,7 @@
   std::ofstream{ src_file } << content;
 
   // Run flow
-  testController.runSession(plan, true);
+  runSession(plan, true);
 
   // Check compressed file
   std::ifstream compressed(compressed_file, std::ios::in | std::ios::binary);
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index 370987f..24572cd 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -136,31 +136,31 @@
  public:
   MergeTestController() {
     init_file_paths();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::MergeContent>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<minifi::processors::MergeContent>();
+    LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>();
     LogTestController::getInstance().setTrace<core::ProcessSession>();
     LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinFiles>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::Bin>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::processors::BinManager>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::Connection>();
-    LogTestController::getInstance().setTrace<org::apache::nifi::minifi::core::Connectable>();
+    LogTestController::getInstance().setTrace<minifi::processors::BinFiles>();
+    LogTestController::getInstance().setTrace<minifi::processors::Bin>();
+    LogTestController::getInstance().setTrace<minifi::processors::BinManager>();
+    LogTestController::getInstance().setTrace<minifi::Connection>();
+    LogTestController::getInstance().setTrace<minifi::core::Connectable>();
 
     std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
+    auto content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+    content_repo->initialize(std::make_shared<minifi::Configure>());
 
-    processor = std::make_shared<org::apache::nifi::minifi::processors::MergeContent>("mergecontent");
-    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute");
+    processor = std::make_shared<processors::MergeContent>("mergecontent");
     processor->initialize();
     utils::Identifier processoruuid;
-    REQUIRE(true == processor->getUUID(processoruuid));
+    REQUIRE(processor->getUUID(processoruuid));
+    std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<minifi::processors::LogAttribute>("logattribute");
     utils::Identifier logAttributeuuid;
-    REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid));
+    REQUIRE(logAttributeProcessor->getUUID(logAttributeuuid));
 
-    auto content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-    content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
     // output from merge processor to log attribute
     output = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection");
-    output->addRelationship(core::Relationship("merged", "Merge successful output"));
+    output->addRelationship(processors::MergeContent::Merge);
     output->setSource(processor);
     output->setDestination(logAttributeProcessor);
     output->setSourceUUID(processoruuid);
@@ -172,36 +172,26 @@
     input->setDestinationUUID(processoruuid);
     processor->addConnection(input);
 
-    std::set<core::Relationship> autoTerminatedRelationships;
-    core::Relationship original("original", "");
-    core::Relationship failure("failure", "");
-    autoTerminatedRelationships.insert(original);
-    autoTerminatedRelationships.insert(failure);
-    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
+    processor->setAutoTerminatedRelationships({processors::MergeContent::Original, processors::MergeContent::Failure});
 
     processor->incrementActiveTasks();
     processor->setScheduledState(core::ScheduledState::RUNNING);
     logAttributeProcessor->incrementActiveTasks();
     logAttributeProcessor->setScheduledState(core::ScheduledState::RUNNING);
 
-    node = std::make_shared<core::ProcessorNode>(processor);
-    context = std::make_shared<core::ProcessContext>(node, nullptr, repo, repo, content_repo);
+    context = std::make_shared<core::ProcessContext>(std::make_shared<core::ProcessorNode>(processor), nullptr, repo, repo, content_repo);
   }
-  ~MergeTestController() = default;
+  ~MergeTestController() {
+    LogTestController::getInstance().reset();
+  }
+
   std::shared_ptr<core::ProcessContext> context;
-  std::shared_ptr<core::ProcessorNode> node;
   std::shared_ptr<core::Processor> processor;
   std::shared_ptr<minifi::Connection> input;
   std::shared_ptr<minifi::Connection> output;
 };
 
-TEST_CASE("MergeFileDefragment", "[mergefiletest1]") {
-  MergeTestController testController;
-  auto context = testController.context;
-  auto processor = testController.processor;
-  auto input = testController.input;
-  auto output = testController.output;
-
+TEST_CASE_METHOD(MergeTestController, "MergeFileDefragment", "[mergefiletest1]") {
   {
     std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
     std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
@@ -225,10 +215,8 @@
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
 
   core::ProcessSession sessionGenFlowFile(context);
-  std::shared_ptr<core::FlowFile> record[6];
-
   // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-  for (int i = 0; i < 6; i++) {
+  for (const int i : {0, 2, 5, 4, 1, 3}) {
     const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
     std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
     sessionGenFlowFile.import(flowFileName, flow, true, 0);
@@ -242,14 +230,9 @@
     else
       flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
     flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
-    record[i] = flow;
+    sessionGenFlowFile.flushContent();
+    input->put(flow);
   }
-  input->put(record[0]);
-  input->put(record[2]);
-  input->put(record[5]);
-  input->put(record[4]);
-  input->put(record[1]);
-  input->put(record[3]);
 
   auto factory = std::make_shared<core::ProcessSessionFactory>(context);
   processor->onSchedule(context, factory);
@@ -278,10 +261,9 @@
     std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
     REQUIRE(callback.to_string() == contents);
   }
-  LogTestController::getInstance().reset();
 }
 
-TEST_CASE("MergeFileDefragmentDelimiter", "[mergefiletest2]") {
+TEST_CASE_METHOD(MergeTestController, "MergeFileDefragmentDelimiter", "[mergefiletest2]") {
   {
     std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
     std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
@@ -315,12 +297,6 @@
     expectfileSecond << "footer";
   }
 
-  MergeTestController testController;
-  auto context = testController.context;
-  auto processor = testController.processor;
-  auto input = testController.input;
-  auto output = testController.output;
-
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_FILENAME);
@@ -329,10 +305,8 @@
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::Demarcator, DEMARCATOR_FILE);
 
   core::ProcessSession sessionGenFlowFile(context);
-  std::shared_ptr<core::FlowFile> record[6];
-
   // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-  for (int i = 0; i < 6; i++) {
+  for (const int i : {0, 2, 5, 4, 1, 3}) {
     const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
     std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
     sessionGenFlowFile.import(flowFileName, flow, true, 0);
@@ -346,14 +320,9 @@
     else
       flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
     flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
-    record[i] = flow;
+    sessionGenFlowFile.flushContent();
+    input->put(flow);
   }
-  input->put(record[0]);
-  input->put(record[2]);
-  input->put(record[5]);
-  input->put(record[4]);
-  input->put(record[1]);
-  input->put(record[3]);
 
   REQUIRE(processor->getName() == "mergecontent");
   auto factory = std::make_shared<core::ProcessSessionFactory>(context);
@@ -383,10 +352,9 @@
     std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
     REQUIRE(callback.to_string() == contents);
   }
-  LogTestController::getInstance().reset();
 }
 
-TEST_CASE("MergeFileDefragmentDropFlow", "[mergefiletest3]") {
+TEST_CASE_METHOD(MergeTestController, "MergeFileDefragmentDropFlow", "[mergefiletest3]") {
   {
     std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
     std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
@@ -407,24 +375,14 @@
     }
   }
 
-  MergeTestController testController;
-  auto context = testController.context;
-  auto processor = testController.processor;
-  auto input = testController.input;
-  auto output = testController.output;
-
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_DEFRAGMENT);
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MaxBinAge, "1 sec");
 
   core::ProcessSession sessionGenFlowFile(context);
-  std::shared_ptr<core::FlowFile> record[6];
-
-  // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-  for (int i = 0; i < 6; i++) {
-    if (i == 4)
-      continue;
+  // Generate 5 flowfiles, first threes merged to one, the other two merged to one
+  for (const int i : {0, 2, 5, 1, 3}) {
     const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
     std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
     sessionGenFlowFile.import(flowFileName, flow, true, 0);
@@ -438,20 +396,14 @@
     else
       flow->setAttribute(processors::BinFiles::FRAGMENT_INDEX_ATTRIBUTE, std::to_string(i-3));
     flow->setAttribute(processors::BinFiles::FRAGMENT_COUNT_ATTRIBUTE, std::to_string(3));
-    record[i] = flow;
+    sessionGenFlowFile.flushContent();
+    input->put(flow);
   }
-  input->put(record[0]);
-  input->put(record[2]);
-  input->put(record[5]);
-  input->put(record[1]);
-  input->put(record[3]);
 
   REQUIRE(processor->getName() == "mergecontent");
   auto factory = std::make_shared<core::ProcessSessionFactory>(context);
   processor->onSchedule(context, factory);
-  for (int i = 0; i < 6; i++) {
-    if (i == 4)
-      continue;
+  for (int i = 0; i < 5; i++) {
     auto session = std::make_shared<core::ProcessSession>(context);
     processor->onTrigger(context, session);
     session->commit();
@@ -482,10 +434,9 @@
     std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
     REQUIRE(callback.to_string() == contents);
   }
-  LogTestController::getInstance().reset();
 }
 
-TEST_CASE("MergeFileBinPack", "[mergefiletest4]") {
+TEST_CASE_METHOD(MergeTestController, "MergeFileBinPack", "[mergefiletest4]") {
   {
     std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
     std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
@@ -504,12 +455,6 @@
     }
   }
 
-  MergeTestController testController;
-  auto context = testController.context;
-  auto processor = testController.processor;
-  auto input = testController.input;
-  auto output = testController.output;
-
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
@@ -517,22 +462,15 @@
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
 
   core::ProcessSession sessionGenFlowFile(context);
-  std::shared_ptr<core::FlowFile> record[6];
-
   // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-  for (int i = 0; i < 6; i++) {
+  for (const int i : {0, 1, 2, 3, 4, 5}) {
     const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
     std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
     sessionGenFlowFile.import(flowFileName, flow, true, 0);
     flow->setAttribute("tag", "tag");
-    record[i] = flow;
+    sessionGenFlowFile.flushContent();
+    input->put(flow);
   }
-  input->put(record[0]);
-  input->put(record[1]);
-  input->put(record[2]);
-  input->put(record[3]);
-  input->put(record[4]);
-  input->put(record[5]);
 
   REQUIRE(processor->getName() == "mergecontent");
   auto factory = std::make_shared<core::ProcessSessionFactory>(context);
@@ -562,11 +500,10 @@
     std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
     REQUIRE(callback.to_string() == contents);
   }
-  LogTestController::getInstance().reset();
 }
 
 
-TEST_CASE("MergeFileTar", "[mergefiletest4]") {
+TEST_CASE_METHOD(MergeTestController, "MergeFileTar", "[mergefiletest4]") {
   {
     std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
     std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
@@ -585,12 +522,6 @@
     }
   }
 
-  MergeTestController testController;
-  auto context = testController.context;
-  auto processor = testController.processor;
-  auto input = testController.input;
-  auto output = testController.output;
-
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_TAR_VALUE);
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
@@ -598,22 +529,15 @@
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
 
   core::ProcessSession sessionGenFlowFile(context);
-  std::shared_ptr<core::FlowFile> record[6];
-
   // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-  for (int i = 0; i < 6; i++) {
+  for (const int i : {0, 1, 2, 3, 4, 5}) {
     const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
     std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
     sessionGenFlowFile.import(flowFileName, flow, true, 0);
     flow->setAttribute("tag", "tag");
-    record[i] = flow;
+    sessionGenFlowFile.flushContent();
+    input->put(flow);
   }
-  input->put(record[0]);
-  input->put(record[1]);
-  input->put(record[2]);
-  input->put(record[3]);
-  input->put(record[4]);
-  input->put(record[5]);
 
   REQUIRE(processor->getName() == "mergecontent");
   auto factory = std::make_shared<core::ProcessSessionFactory>(context);
@@ -653,10 +577,9 @@
       REQUIRE(archives[i-3].to_string() == contents);
     }
   }
-  LogTestController::getInstance().reset();
 }
 
-TEST_CASE("MergeFileZip", "[mergefiletest5]") {
+TEST_CASE_METHOD(MergeTestController, "MergeFileZip", "[mergefiletest5]") {
   {
     std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
     std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
@@ -675,12 +598,6 @@
     }
   }
 
-  MergeTestController testController;
-  auto context = testController.context;
-  auto processor = testController.processor;
-  auto input = testController.input;
-  auto output = testController.output;
-
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_ZIP_VALUE);
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
@@ -688,22 +605,15 @@
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
 
   core::ProcessSession sessionGenFlowFile(context);
-  std::shared_ptr<core::FlowFile> record[6];
-
   // Generate 6 flowfiles, first threes merged to one, second thress merged to one
-  for (int i = 0; i < 6; i++) {
+  for (const int i : {0, 1, 2, 3, 4, 5}) {
     const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
     std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
     sessionGenFlowFile.import(flowFileName, flow, true, 0);
     flow->setAttribute("tag", "tag");
-    record[i] = flow;
+    sessionGenFlowFile.flushContent();
+    input->put(flow);
   }
-  input->put(record[0]);
-  input->put(record[1]);
-  input->put(record[2]);
-  input->put(record[3]);
-  input->put(record[4]);
-  input->put(record[5]);
 
   REQUIRE(processor->getName() == "mergecontent");
   auto factory = std::make_shared<core::ProcessSessionFactory>(context);
@@ -743,10 +653,9 @@
       REQUIRE(archives[i-3].to_string() == contents);
     }
   }
-  LogTestController::getInstance().reset();
 }
 
-TEST_CASE("MergeFileOnAttribute", "[mergefiletest5]") {
+TEST_CASE_METHOD(MergeTestController, "MergeFileOnAttribute", "[mergefiletest5]") {
   {
     std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
     std::ofstream expectfileSecond(EXPECT_MERGE_CONTENT_SECOND, std::ios::binary);
@@ -761,12 +670,6 @@
     }
   }
 
-  MergeTestController testController;
-  auto context = testController.context;
-  auto processor = testController.processor;
-  auto input = testController.input;
-  auto output = testController.output;
-
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeFormat, org::apache::nifi::minifi::processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::MergeStrategy, org::apache::nifi::minifi::processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
@@ -774,10 +677,8 @@
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::CorrelationAttributeName, "tag");
 
   core::ProcessSession sessionGenFlowFile(context);
-  std::shared_ptr<core::FlowFile> record[6];
-
   // Generate 6 flowfiles, even files are merged to one, odd files are merged to an other
-  for (int i = 0; i < 6; i++) {
+  for (const int i : {0, 1, 2, 3, 4, 5}) {
     const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
     std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
     sessionGenFlowFile.import(flowFileName, flow, true, 0);
@@ -785,14 +686,9 @@
       flow->setAttribute("tag", "even");
     else
       flow->setAttribute("tag", "odd");
-    record[i] = flow;
+    sessionGenFlowFile.flushContent();
+    input->put(flow);
   }
-  input->put(record[0]);
-  input->put(record[1]);
-  input->put(record[2]);
-  input->put(record[3]);
-  input->put(record[4]);
-  input->put(record[5]);
 
   REQUIRE(processor->getName() == "mergecontent");
   auto factory = std::make_shared<core::ProcessSessionFactory>(context);
@@ -820,16 +716,9 @@
     std::string contents((std::istreambuf_iterator<char>(file2)), std::istreambuf_iterator<char>());
     REQUIRE(callback.to_string() == contents);
   }
-  LogTestController::getInstance().reset();
 }
 
-TEST_CASE("Test Merge File Attributes Keeping Only Common Attributes", "[testMergeFileKeepOnlyCommonAttributes]") {
-  MergeTestController testController;
-  auto context = testController.context;
-  auto processor = testController.processor;
-  auto input = testController.input;
-  auto output = testController.output;
-
+TEST_CASE_METHOD(MergeTestController, "Test Merge File Attributes Keeping Only Common Attributes", "[testMergeFileKeepOnlyCommonAttributes]") {
   {
     std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
 
@@ -849,10 +738,9 @@
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::DelimiterStrategy, org::apache::nifi::minifi::processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
 
   core::ProcessSession sessionGenFlowFile(context);
-  std::shared_ptr<core::FlowFile> record[3];
 
   // Generate 3 flowfiles merging all into one
-  for (int i = 0; i < 3; i++) {
+  for (const int i : {1, 2, 0}) {
     const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
     std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
     sessionGenFlowFile.import(flowFileName, flow, true, 0);
@@ -871,11 +759,9 @@
     }
     flow->setAttribute("tagCommon", "common");
 
-    record[i] = flow;
+    sessionGenFlowFile.flushContent();
+    input->put(flow);
   }
-  input->put(record[1]);
-  input->put(record[2]);
-  input->put(record[0]);
 
   auto factory = std::make_shared<core::ProcessSessionFactory>(context);
   processor->onSchedule(context, factory);
@@ -894,17 +780,9 @@
   REQUIRE(attributes.find("tagUnique2") == attributes.end());
   REQUIRE(attributes["tagCommon"] == "common");
   REQUIRE(attributes["mime.type"] == "application/tar");
-
-  LogTestController::getInstance().reset();
 }
 
-TEST_CASE("Test Merge File Attributes Keeping All Unique Attributes", "[testMergeFileKeepAllUniqueAttributes]") {
-  MergeTestController testController;
-  auto context = testController.context;
-  auto processor = testController.processor;
-  auto input = testController.input;
-  auto output = testController.output;
-
+TEST_CASE_METHOD(MergeTestController, "Test Merge File Attributes Keeping All Unique Attributes", "[testMergeFileKeepAllUniqueAttributes]") {
   {
     std::ofstream expectfileFirst(EXPECT_MERGE_CONTENT_FIRST, std::ios::binary);
 
@@ -925,10 +803,8 @@
   context->setProperty(org::apache::nifi::minifi::processors::MergeContent::AttributeStrategy, org::apache::nifi::minifi::processors::merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE);
 
   core::ProcessSession sessionGenFlowFile(context);
-  std::shared_ptr<core::FlowFile> record[3];
-
   // Generate 3 flowfiles merging all into one
-  for (int i = 0; i < 3; i++) {
+  for (const int i : {1, 2, 0}) {
     const auto flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
     std::string flowFileName = std::string(FLOW_FILE) + "." + std::to_string(i) + ".txt";
     sessionGenFlowFile.import(flowFileName, flow, true, 0);
@@ -947,11 +823,9 @@
     }
     flow->setAttribute("tagCommon", "common");
 
-    record[i] = flow;
+    sessionGenFlowFile.flushContent();
+    input->put(flow);
   }
-  input->put(record[1]);
-  input->put(record[2]);
-  input->put(record[0]);
 
   auto factory = std::make_shared<core::ProcessSessionFactory>(context);
   processor->onSchedule(context, factory);
@@ -970,6 +844,4 @@
   REQUIRE(attributes["tagUnique2"] == "unique2");
   REQUIRE(attributes["tagCommon"] == "common");
   REQUIRE(attributes["mime.type"] == "application/tar");
-
-  LogTestController::getInstance().reset();
 }
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp
index 1ba607b..6d376af 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -34,6 +34,7 @@
 #include "../../extensions/libarchive/MergeContent.h"
 #include "../test/BufferReader.h"
 #include "core/repository/VolatileFlowFileRepository.h"
+#include "../../extensions/rocksdb-repos/DatabaseContentRepository.h"
 
 using Connection = minifi::Connection;
 using MergeContent = minifi::processors::MergeContent;
@@ -250,6 +251,7 @@
   LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
   LogTestController::getInstance().setTrace<core::repository::FlowFileRepository>();
   LogTestController::getInstance().setTrace<core::repository::VolatileRepository<minifi::ResourceClaim::Path>>();
+  LogTestController::getInstance().setTrace<core::repository::DatabaseContentRepository>();
 
   char format[] = "/var/tmp/test.XXXXXX";
   auto dir = testController.createTempDirectory(format);
@@ -269,6 +271,10 @@
     testController.getLogger()->log_info("Using FileSystemRepository");
     content_repo = std::make_shared<core::repository::FileSystemRepository>();
   }
+  SECTION("DatabaseContentRepository") {
+    testController.getLogger()->log_info("Using DatabaseContentRepository");
+    content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+  }
   ff_repository->initialize(config);
   content_repo->initialize(config);
 
@@ -298,6 +304,7 @@
       // the processor added new content to the flowFile
       REQUIRE(claim != newClaim);
       // only this instance behind this shared_ptr keeps the resource alive
+      REQUIRE(claim.use_count() == 1);
       REQUIRE(claim->getFlowFileRecordOwnedCount() == 1);
       // one from the FlowFile and one from the persisted instance
       REQUIRE(newClaim->getFlowFileRecordOwnedCount() == 2);
diff --git a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
new file mode 100644
index 0000000..ecc987c
--- /dev/null
+++ b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
@@ -0,0 +1,151 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <string>
+
+#include "core/Core.h"
+#include "FileSystemRepository.h"
+#include "VolatileContentRepository.h"
+#include "DatabaseContentRepository.h"
+#include "FlowFileRecord.h"
+#include "../TestBase.h"
+
+template<typename ContentRepositoryClass>
+class ContentSessionController : public TestController {
+ public:
+  ContentSessionController() {
+    char format[] = "/var/tmp/content_repo.XXXXXX";
+    std::string contentRepoPath = createTempDirectory(format);
+    auto config = std::make_shared<minifi::Configure>();
+    config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, contentRepoPath);
+    contentRepository = std::make_shared<ContentRepositoryClass>();
+    contentRepository->initialize(config);
+  }
+
+  ~ContentSessionController() {
+    log.reset();
+  }
+
+  std::shared_ptr<core::ContentRepository> contentRepository;
+};
+
+const std::shared_ptr<minifi::io::BaseStream>& operator<<(const std::shared_ptr<minifi::io::BaseStream>& stream, const std::string& str) {
+  REQUIRE(stream->write(const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(str.data())), str.length()) == str.length());
+  return stream;
+}
+
+const std::shared_ptr<minifi::io::BaseStream>& operator>>(const std::shared_ptr<minifi::io::BaseStream>& stream, std::string& str) {
+  str = "";
+  uint8_t buffer[4096]{};
+  while (true) {
+    size_t ret = stream->read(buffer, sizeof(buffer));
+    REQUIRE(ret >= 0);
+    if (ret == 0) {
+      break;
+    }
+    str += std::string{reinterpret_cast<char*>(buffer), ret};
+  }
+  return stream;
+}
+
+// TODO(adebreceni):
+//  seems like the current version of Catch2 does not support templated tests
+//  we should update instead of creating make-shift macros
+template<typename ContentRepositoryClass>
+void test_template() {
+  ContentSessionController<ContentRepositoryClass> controller;
+  std::shared_ptr<core::ContentRepository> contentRepository = controller.contentRepository;
+
+
+  std::shared_ptr<minifi::ResourceClaim> oldClaim;
+  {
+    auto session = contentRepository->createSession();
+    oldClaim = session->create();
+    session->write(oldClaim) << "data";
+    session->commit();
+  }
+
+  auto session = contentRepository->createSession();
+  REQUIRE_THROWS(session->write(oldClaim));
+
+  REQUIRE_NOTHROW(session->read(oldClaim));
+  session->write(oldClaim, core::ContentSession::WriteMode::APPEND) << "-addendum";
+  REQUIRE_THROWS(session->read(oldClaim));  // now throws because we appended to the content
+
+  auto claim1 = session->create();
+  session->write(claim1) << "hello content!";
+  REQUIRE_THROWS(session->read(claim1));  // TODO(adebreceni): we currently have no means to create joined streams
+
+  auto claim2 = session->create();
+  session->write(claim2, core::ContentSession::WriteMode::APPEND) << "beginning";
+  session->write(claim2, core::ContentSession::WriteMode::APPEND) << "-end";
+
+  auto claim3 = session->create();
+  session->write(claim3) << "first";
+  session->write(claim3, core::ContentSession::WriteMode::APPEND) << "-last";
+
+  auto claim4 = session->create();
+  session->write(claim4) << "beginning";
+  session->write(claim4) << "overwritten";
+
+  SECTION("Commit") {
+    session->commit();
+
+    std::string content;
+    contentRepository->read(*oldClaim) >> content;
+    REQUIRE(content == "data-addendum");
+
+    contentRepository->read(*claim1) >> content;
+    REQUIRE(content == "hello content!");
+
+    contentRepository->read(*claim2) >> content;
+    REQUIRE(content == "beginning-end");
+
+    contentRepository->read(*claim3) >> content;
+    REQUIRE(content == "first-last");
+
+    contentRepository->read(*claim4) >> content;
+    REQUIRE(content == "overwritten");
+  }
+
+  SECTION("Rollback") {
+    session->rollback();
+
+    std::string content;
+    contentRepository->read(*oldClaim) >> content;
+    REQUIRE(content == "data");
+
+    REQUIRE(!contentRepository->exists(*claim1));
+    REQUIRE(!contentRepository->exists(*claim2));
+    REQUIRE(!contentRepository->exists(*claim3));
+    REQUIRE(!contentRepository->exists(*claim4));
+  }
+}
+
+TEST_CASE("ContentSession behavior") {
+  SECTION("FileSystemRepository") {
+    test_template<core::repository::FileSystemRepository>();
+  }
+  SECTION("VolatileContentRepository") {
+    test_template<core::repository::VolatileContentRepository>();
+  }
+  SECTION("DatabaseContentRepository") {
+    test_template<core::repository::DatabaseContentRepository>();
+  }
+}
diff --git a/nanofi/src/cxx/Plan.cpp b/nanofi/src/cxx/Plan.cpp
index e6a0211..f4c5eb0 100644
--- a/nanofi/src/cxx/Plan.cpp
+++ b/nanofi/src/cxx/Plan.cpp
@@ -171,6 +171,7 @@
       flowFile->setAttribute(kv.first, kv.second);
     }
     current_session->importFrom(*(input_ff_params->content_stream.get()), flowFile);
+    current_session->flushContent();
     current_session->transfer(flowFile, core::Relationship("success", "success"));
     relationships_[relationships_.size()-1]->put(std::static_pointer_cast<core::FlowFile>(flowFile));
   }