MINIFICPP-1972 - Refactor State Manager code

Co-authored-by: adamdebreceni <64783590+adamdebreceni@users.noreply.github.com>
Co-authored-by: Ferenc Gerlits <fgerlits@gmail.com>
Signed-off-by: Gabor Gyimesi <gamezbird@gmail.com>

This closes #1443
diff --git a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
index be72d8c..8dede17 100644
--- a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
+++ b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
@@ -128,7 +128,7 @@
  protected:
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<SourceInitiatedSubscriptionListener>::getLogger();
 
-  core::CoreComponentStateManager* state_manager_;
+  core::StateManager* state_manager_;
 
   std::shared_ptr<core::ProcessSessionFactory> session_factory_;
 
diff --git a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp
similarity index 68%
rename from extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
rename to extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp
index e4b38f8..9227e9a 100644
--- a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
+++ b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp
@@ -15,10 +15,11 @@
  * limitations under the License.
  */
 
+#include <cinttypes>
 #include <fstream>
 #include <utility>
 
-#include "RocksDbPersistableKeyValueStoreService.h"
+#include "RocksDbStateStorage.h"
 #include "../encryption/RocksDbEncryptionProvider.h"
 #include "utils/StringUtils.h"
 #include "core/PropertyBuilder.h"
@@ -26,55 +27,63 @@
 
 namespace org::apache::nifi::minifi::controllers {
 
-const core::Property RocksDbPersistableKeyValueStoreService::LinkedServices(
-    core::PropertyBuilder::createProperty("Linked Services")
-    ->withDescription("Referenced Controller Services")
-    ->build());
-const core::Property RocksDbPersistableKeyValueStoreService::AlwaysPersist(
-    core::PropertyBuilder::createProperty(AbstractAutoPersistingKeyValueStoreService::AlwaysPersistPropertyName)
+const core::Property RocksDbStateStorage::AlwaysPersist(
+    core::PropertyBuilder::createProperty(ALWAYS_PERSIST_PROPERTY_NAME)
     ->withDescription("Persist every change instead of persisting it periodically.")
     ->isRequired(false)
     ->withDefaultValue<bool>(false)
     ->build());
-const core::Property RocksDbPersistableKeyValueStoreService::AutoPersistenceInterval(
-    core::PropertyBuilder::createProperty(AbstractAutoPersistingKeyValueStoreService::AutoPersistenceIntervalPropertyName)
+const core::Property RocksDbStateStorage::AutoPersistenceInterval(
+    core::PropertyBuilder::createProperty(AUTO_PERSISTENCE_INTERVAL_PROPERTY_NAME)
     ->withDescription("The interval of the periodic task persisting all values. Only used if Always Persist is false. If set to 0 seconds, auto persistence will be disabled.")
     ->isRequired(false)
     ->withDefaultValue<core::TimePeriodValue>("1 min")
     ->build());
-const core::Property RocksDbPersistableKeyValueStoreService::Directory(
+const core::Property RocksDbStateStorage::Directory(
     core::PropertyBuilder::createProperty("Directory")
     ->withDescription("Path to a directory for the database")
     ->isRequired(true)
     ->build());
 
-RocksDbPersistableKeyValueStoreService::RocksDbPersistableKeyValueStoreService(std::string name, const utils::Identifier& uuid /*= utils::Identifier()*/)
-    : PersistableKeyValueStoreService(name, uuid)
-    , AbstractAutoPersistingKeyValueStoreService(std::move(name), uuid) {
+RocksDbStateStorage::RocksDbStateStorage(const std::string& name, const utils::Identifier& uuid /*= utils::Identifier()*/)
+    : KeyValueStateStorage(name, uuid) {
 }
 
-void RocksDbPersistableKeyValueStoreService::initialize() {
+RocksDbStateStorage::~RocksDbStateStorage() {
+  auto_persistor_.stop();
+}
+
+void RocksDbStateStorage::initialize() {
   ControllerService::initialize();
   setSupportedProperties(properties());
 }
 
-void RocksDbPersistableKeyValueStoreService::onEnable() {
+void RocksDbStateStorage::onEnable() {
   if (configuration_ == nullptr) {
-    logger_->log_debug("Cannot enable RocksDbPersistableKeyValueStoreService");
+    logger_->log_debug("Cannot enable RocksDbStateStorage");
     return;
   }
 
-  AbstractAutoPersistingKeyValueStoreService::onEnable();
+  const auto always_persist = getProperty<bool>(AlwaysPersist.getName()).value_or(false);
+  logger_->log_info("Always Persist property: %s", always_persist ? "true" : "false");
+
+  const auto auto_persistence_interval = getProperty<core::TimePeriodValue>(AutoPersistenceInterval.getName()).value_or(core::TimePeriodValue{}).getMilliseconds();
+  logger_->log_info("Auto Persistence Interval property: %" PRId64 " ms", auto_persistence_interval.count());
 
   if (!getProperty(Directory.getName(), directory_)) {
     logger_->log_error("Invalid or missing property: Directory");
     return;
   }
 
+  auto_persistor_.start(always_persist, auto_persistence_interval, [this] { return persistNonVirtual(); });
+
   db_.reset();
 
-  const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configuration_->getHome()}, core::repository::DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
-  logger_->log_info("Using %s RocksDbPersistableKeyValueStoreService", encrypted_env ? "encrypted" : "plaintext");
+  auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configuration_->getHome()}, core::repository::DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
+  if (!encrypted_env) {
+    encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configuration_->getHome()}, core::repository::DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME_OLD});
+  }
+  logger_->log_info("Using %s RocksDbStateStorage", encrypted_env ? "encrypted" : "plaintext");
 
   auto set_db_opts = [encrypted_env] (internal::Writable<rocksdb::DBOptions>& db_opts) {
     db_opts.set(&rocksdb::DBOptions::create_if_missing, true);
@@ -100,20 +109,19 @@
     return;
   }
 
-  if (always_persist_) {
+  if (auto_persistor_.isAlwaysPersisting()) {
     default_write_options.sync = true;
   }
 
-  logger_->log_trace("Enabled RocksDbPersistableKeyValueStoreService");
+  logger_->log_trace("Enabled RocksDbStateStorage");
 }
 
-void RocksDbPersistableKeyValueStoreService::notifyStop() {
-  AbstractAutoPersistingKeyValueStoreService::notifyStop();
-
+void RocksDbStateStorage::notifyStop() {
+  auto_persistor_.stop();
   db_.reset();
 }
 
-bool RocksDbPersistableKeyValueStoreService::set(const std::string& key, const std::string& value) {
+bool RocksDbStateStorage::set(const std::string& key, const std::string& value) {
   if (!db_) {
     return false;
   }
@@ -129,7 +137,7 @@
   return true;
 }
 
-bool RocksDbPersistableKeyValueStoreService::get(const std::string& key, std::string& value) {
+bool RocksDbStateStorage::get(const std::string& key, std::string& value) {
   if (!db_) {
     return false;
   }
@@ -149,7 +157,7 @@
   return true;
 }
 
-bool RocksDbPersistableKeyValueStoreService::get(std::unordered_map<std::string, std::string>& kvs) {
+bool RocksDbStateStorage::get(std::unordered_map<std::string, std::string>& kvs) {
   if (!db_) {
     return false;
   }
@@ -169,7 +177,7 @@
   return true;
 }
 
-bool RocksDbPersistableKeyValueStoreService::remove(const std::string& key) {
+bool RocksDbStateStorage::remove(const std::string& key) {
   if (!db_) {
     return false;
   }
@@ -185,7 +193,7 @@
   return true;
 }
 
-bool RocksDbPersistableKeyValueStoreService::clear() {
+bool RocksDbStateStorage::clear() {
   if (!db_) {
     return false;
   }
@@ -208,7 +216,7 @@
   return true;
 }
 
-bool RocksDbPersistableKeyValueStoreService::update(const std::string& /*key*/, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& /*update_func*/) {
+bool RocksDbStateStorage::update(const std::string& /*key*/, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& /*update_func*/) {
   if (!db_) {
     return false;
   }
@@ -219,7 +227,7 @@
   throw std::logic_error("Unsupported method");
 }
 
-bool RocksDbPersistableKeyValueStoreService::persist() {
+bool RocksDbStateStorage::persistNonVirtual() {
   if (!db_) {
     return false;
   }
@@ -227,12 +235,12 @@
   if (!opendb) {
     return false;
   }
-  if (always_persist_) {
+  if (auto_persistor_.isAlwaysPersisting()) {
     return true;
   }
   return opendb->FlushWAL(true /*sync*/).ok();
 }
 
-REGISTER_RESOURCE_AS(RocksDbPersistableKeyValueStoreService, ControllerService, ("RocksDbPersistableKeyValueStoreService", "rocksdbpersistablekeyvaluestoreservice"));
+REGISTER_RESOURCE_AS(RocksDbStateStorage, ControllerService, ("RocksDbPersistableKeyValueStoreService", "rocksdbpersistablekeyvaluestoreservice", "RocksDbStateStorage"));
 
 }  // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.h
similarity index 74%
rename from extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h
rename to extensions/rocksdb-repos/controllers/RocksDbStateStorage.h
index 4c6a9a2..69e290e 100644
--- a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h
+++ b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.h
@@ -20,7 +20,8 @@
 #include <string>
 #include <memory>
 
-#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
+#include "controllers/keyvalue/AutoPersistor.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
 #include "core/Core.h"
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
@@ -30,27 +31,28 @@
 
 namespace org::apache::nifi::minifi::controllers {
 
-class RocksDbPersistableKeyValueStoreService : public AbstractAutoPersistingKeyValueStoreService {
+class RocksDbStateStorage : public KeyValueStateStorage {
  public:
-  static constexpr const char* ENCRYPTION_KEY_NAME = "nifi.state.management.provider.local.encryption.key";
+  static constexpr const char* ENCRYPTION_KEY_NAME = "nifi.state.storage.local.encryption.key";
+  static constexpr const char* ENCRYPTION_KEY_NAME_OLD = "nifi.state.management.provider.local.encryption.key";
 
-  explicit RocksDbPersistableKeyValueStoreService(std::string name, const utils::Identifier& uuid = {});
+  explicit RocksDbStateStorage(const std::string& name, const utils::Identifier& uuid = {});
 
-  ~RocksDbPersistableKeyValueStoreService() override = default;
+  ~RocksDbStateStorage() override;
 
-  EXTENSIONAPI static constexpr const char* Description = "A key-value service implemented by RocksDB";
-  EXTENSIONAPI static const core::Property LinkedServices;
+  EXTENSIONAPI static constexpr const char* Description = "A state storage service implemented by RocksDB";
+
   EXTENSIONAPI static const core::Property AlwaysPersist;
   EXTENSIONAPI static const core::Property AutoPersistenceInterval;
   EXTENSIONAPI static const core::Property Directory;
   static auto properties() {
     return std::array{
-      LinkedServices,
       AlwaysPersist,
       AutoPersistenceInterval,
       Directory
     };
   }
+
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES
 
@@ -59,27 +61,25 @@
   void notifyStop() override;
 
   bool set(const std::string& key, const std::string& value) override;
-
   bool get(const std::string& key, std::string& value) override;
-
   bool get(std::unordered_map<std::string, std::string>& kvs) override;
-
   bool remove(const std::string& key) override;
-
   bool clear() override;
-
   bool update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) override;
-
-  bool persist() override;
-
- protected:
-  std::string directory_;
-
-  std::unique_ptr<minifi::internal::RocksDatabase> db_;
-  rocksdb::WriteOptions default_write_options;
+  bool persist() override {
+    return persistNonVirtual();
+  }
 
  private:
-  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<RocksDbPersistableKeyValueStoreService>::getLogger();
+  // non-virtual to allow calling on AutoPersistor's thread during destruction
+  bool persistNonVirtual();
+
+  std::string directory_;
+  std::unique_ptr<minifi::internal::RocksDatabase> db_;
+  rocksdb::WriteOptions default_write_options;
+  AutoPersistor auto_persistor_;
+
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<RocksDbStateStorage>::getLogger();
 };
 
 }  // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/sftp/processors/ListSFTP.h b/extensions/sftp/processors/ListSFTP.h
index 85e8cea..d4725b4 100644
--- a/extensions/sftp/processors/ListSFTP.h
+++ b/extensions/sftp/processors/ListSFTP.h
@@ -33,7 +33,7 @@
 #include "core/Property.h"
 #include "utils/ArrayUtils.h"
 #include "utils/Id.h"
-#include "controllers/keyvalue/PersistableKeyValueStoreService.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
 #include "utils/RegexUtils.h"
 
 namespace org::apache::nifi::minifi::processors {
@@ -119,7 +119,7 @@
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
 
  private:
-  core::CoreComponentStateManager* state_manager_{};
+  core::StateManager* state_manager_{};
   std::string listing_strategy_;
   bool search_recursively_{};
   bool follow_symlink_{};
diff --git a/extensions/sql/processors/QueryDatabaseTable.h b/extensions/sql/processors/QueryDatabaseTable.h
index f510e09..7663253 100644
--- a/extensions/sql/processors/QueryDatabaseTable.h
+++ b/extensions/sql/processors/QueryDatabaseTable.h
@@ -83,7 +83,7 @@
 
   bool saveState();
 
-  core::CoreComponentStateManager* state_manager_{};
+  core::StateManager* state_manager_{};
   std::string table_name_;
   std::unordered_set<sql::SQLColumnIdentifier> return_columns_;
   std::string queried_columns_;
diff --git a/extensions/standard-processors/controllers/InMemoryKeyValueStorage.cpp b/extensions/standard-processors/controllers/InMemoryKeyValueStorage.cpp
new file mode 100644
index 0000000..55bc50d
--- /dev/null
+++ b/extensions/standard-processors/controllers/InMemoryKeyValueStorage.cpp
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "InMemoryKeyValueStorage.h"
+
+namespace org::apache::nifi::minifi::controllers {
+
+bool InMemoryKeyValueStorage::set(const std::string& key, const std::string& value) {
+  map_[key] = value;
+  return true;
+}
+
+bool InMemoryKeyValueStorage::get(const std::string& key, std::string& value) {
+  auto it = map_.find(key);
+  if (it == map_.end()) {
+    return false;
+  } else {
+    value = it->second;
+    return true;
+  }
+}
+
+bool InMemoryKeyValueStorage::get(std::unordered_map<std::string, std::string>& kvs) {
+  kvs = map_;
+  return true;
+}
+
+bool InMemoryKeyValueStorage::remove(const std::string& key) {
+  return map_.erase(key) == 1U;
+}
+
+bool InMemoryKeyValueStorage::clear() {
+  map_.clear();
+  return true;
+}
+
+bool InMemoryKeyValueStorage::update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) {
+  bool exists = false;
+  std::string value;
+  auto it = map_.find(key);
+  if (it != map_.end()) {
+    exists = true;
+    value = it->second;
+  }
+  try {
+    if (!update_func(exists, value)) {
+      return false;
+    }
+  } catch (const std::exception& e) {
+    logger_->log_error("update_func failed with an exception: %s", e.what());
+    return false;
+  } catch (...) {
+    logger_->log_error("update_func failed with an exception");
+    return false;
+  }
+  if (!exists) {
+    it = map_.emplace(key, "").first;
+  }
+  it->second = std::move(value);
+  return true;
+}
+
+}  // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/standard-processors/controllers/InMemoryKeyValueStorage.h b/extensions/standard-processors/controllers/InMemoryKeyValueStorage.h
new file mode 100644
index 0000000..1ca5036
--- /dev/null
+++ b/extensions/standard-processors/controllers/InMemoryKeyValueStorage.h
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma  once
+
+#include <functional>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <utility>
+
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerFactory.h"
+
+namespace org::apache::nifi::minifi::controllers {
+
+class InMemoryKeyValueStorage {
+ public:
+  InMemoryKeyValueStorage() = default;
+  explicit InMemoryKeyValueStorage(std::unordered_map<std::string, std::string> contents) : map_{std::move(contents)} {}
+
+  bool set(const std::string& key, const std::string& value);
+  bool get(const std::string& key, std::string& value);
+  bool get(std::unordered_map<std::string, std::string>& kvs);
+  bool remove(const std::string& key);
+  bool clear();
+  bool update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func);
+
+ private:
+  std::unordered_map<std::string, std::string> map_;
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<InMemoryKeyValueStorage>::getLogger();
+};
+
+}  // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/standard-processors/controllers/PersistentMapStateStorage.cpp b/extensions/standard-processors/controllers/PersistentMapStateStorage.cpp
new file mode 100644
index 0000000..d771a53
--- /dev/null
+++ b/extensions/standard-processors/controllers/PersistentMapStateStorage.cpp
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PersistentMapStateStorage.h"
+
+#include <cinttypes>
+#include <fstream>
+#include <set>
+
+#include "utils/file/FileUtils.h"
+#include "utils/StringUtils.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+
+namespace {
+  std::string escape(const std::string& str) {
+    std::stringstream escaped;
+    for (const auto c : str) {
+      switch (c) {
+        case '\\':
+          escaped << "\\\\";
+          break;
+        case '\n':
+          escaped << "\\n";
+          break;
+        case '=':
+          escaped << "\\=";
+          break;
+        default:
+          escaped << c;
+          break;
+      }
+    }
+    return escaped.str();
+  }
+}  // namespace
+
+namespace org::apache::nifi::minifi::controllers {
+
+const core::Property PersistentMapStateStorage::AlwaysPersist(
+    core::PropertyBuilder::createProperty(ALWAYS_PERSIST_PROPERTY_NAME)
+    ->withDescription("Persist every change instead of persisting it periodically.")
+    ->isRequired(false)
+    ->withDefaultValue<bool>(false)
+    ->build());
+const core::Property PersistentMapStateStorage::AutoPersistenceInterval(
+    core::PropertyBuilder::createProperty(AUTO_PERSISTENCE_INTERVAL_PROPERTY_NAME)
+    ->withDescription("The interval of the periodic task persisting all values. Only used if Always Persist is false. If set to 0 seconds, auto persistence will be disabled.")
+    ->isRequired(false)
+    ->withDefaultValue<core::TimePeriodValue>("1 min")
+    ->build());
+const core::Property PersistentMapStateStorage::File(
+    core::PropertyBuilder::createProperty("File")
+    ->withDescription("Path to a file to store state")
+    ->isRequired(true)
+    ->build());
+
+PersistentMapStateStorage::PersistentMapStateStorage(const std::string& name, const utils::Identifier& uuid /*= utils::Identifier()*/)
+    : KeyValueStateStorage(name, uuid) {
+}
+
+PersistentMapStateStorage::PersistentMapStateStorage(const std::string& name, const std::shared_ptr<Configure> &configuration)
+    : KeyValueStateStorage(name) {
+  setConfiguration(configuration);
+}
+
+PersistentMapStateStorage::~PersistentMapStateStorage() {
+  auto_persistor_.stop();
+  persistNonVirtual();
+}
+
+bool PersistentMapStateStorage::parseLine(const std::string& line, std::string& key, std::string& value) {
+  std::stringstream key_ss;
+  std::stringstream value_ss;
+  bool in_escape_sequence = false;
+  bool key_complete = false;
+  for (const auto c : line) {
+    auto& current = key_complete ? value_ss : key_ss;
+    if (in_escape_sequence) {
+      switch (c) {
+        case '\\':
+          current << '\\';
+          break;
+        case 'n':
+          current << '\n';
+          break;
+        case '=':
+          current << '=';
+          break;
+        default:
+          logger_->log_error(R"(Invalid escape sequence in "%s": "\%c")", line.c_str(), c);
+          return false;
+      }
+      in_escape_sequence = false;
+    } else {
+      if (c == '\\') {
+        in_escape_sequence = true;
+      } else if (c == '=') {
+        if (key_complete) {
+          logger_->log_error(R"(Unterminated '=' in line "%s")", line.c_str());
+          return false;
+        } else {
+          key_complete = true;
+        }
+      } else {
+        current << c;
+      }
+    }
+  }
+  if (in_escape_sequence) {
+    logger_->log_error("Unterminated escape sequence in \"%s\"", line.c_str());
+    return false;
+  }
+  if (!key_complete) {
+    logger_->log_error("Key not found in \"%s\"", line.c_str());
+    return false;
+  }
+  key = key_ss.str();
+  if (key.empty()) {
+    logger_->log_error(R"(Line with empty key found in "%s": "%s")", file_.c_str(), line.c_str());
+    return false;
+  }
+  value = value_ss.str();
+  return true;
+}
+
+void PersistentMapStateStorage::initialize() {
+  // VolatileMapStateStorage::initialize() also calls setSupportedProperties, and we don't want that
+  ControllerService::initialize();  // NOLINT(bugprone-parent-virtual-call)
+  setSupportedProperties(properties());
+}
+
+void PersistentMapStateStorage::onEnable() {
+  if (configuration_ == nullptr) {
+    logger_->log_debug("Cannot enable PersistentMapStateStorage");
+    return;
+  }
+
+  const auto always_persist = getProperty<bool>(AlwaysPersist.getName()).value_or(false);
+  logger_->log_info("Always Persist property: %s", always_persist ? "true" : "false");
+
+  const auto auto_persistence_interval = getProperty<core::TimePeriodValue>(AutoPersistenceInterval.getName()).value_or(core::TimePeriodValue{}).getMilliseconds();
+  logger_->log_info("Auto Persistence Interval property: %" PRId64 " ms", auto_persistence_interval.count());
+
+  if (!getProperty(File.getName(), file_)) {
+    logger_->log_error("Invalid or missing property: File");
+    return;
+  }
+
+  /* We must not start the persistence thread until we attempted to load the state */
+  load();
+
+  auto_persistor_.start(always_persist, auto_persistence_interval, [this] { return persistNonVirtual(); });
+
+  logger_->log_trace("Enabled PersistentMapStateStorage");
+}
+
+void PersistentMapStateStorage::notifyStop() {
+  auto_persistor_.stop();
+  persist();
+}
+
+bool PersistentMapStateStorage::set(const std::string& key, const std::string& value) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  bool res = storage_.set(key, value);
+  if (auto_persistor_.isAlwaysPersisting() && res) {
+    return persist();
+  }
+  return res;
+}
+
+bool PersistentMapStateStorage::get(const std::string& key, std::string& value) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  return storage_.get(key, value);
+}
+
+bool PersistentMapStateStorage::get(std::unordered_map<std::string, std::string>& kvs) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  return storage_.get(kvs);
+}
+
+bool PersistentMapStateStorage::remove(const std::string& key) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  bool res = storage_.remove(key);
+  if (auto_persistor_.isAlwaysPersisting() && res) {
+    return persist();
+  }
+  return res;
+}
+
+bool PersistentMapStateStorage::clear() {
+  std::lock_guard<std::mutex> lock(mutex_);
+  bool res = storage_.clear();
+  if (auto_persistor_.isAlwaysPersisting() && res) {
+    return persist();
+  }
+  return res;
+}
+
+bool PersistentMapStateStorage::update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  bool res = storage_.update(key, update_func);
+  if (auto_persistor_.isAlwaysPersisting() && res) {
+    return persist();
+  }
+  return res;
+}
+
+bool PersistentMapStateStorage::persistNonVirtual() {
+  std::lock_guard<std::mutex> lock(mutex_);
+  std::ofstream ofs(file_);
+  if (!ofs.is_open()) {
+    logger_->log_error("Failed to open file \"%s\" to store state", file_.c_str());
+    return false;
+  }
+  std::unordered_map<std::string, std::string> storage_copy;
+  if (!storage_.get(storage_copy)) {
+    logger_->log_error("Could not read the contents of the in-memory storage");
+    return false;
+  }
+
+  ofs << escape(FORMAT_VERSION_KEY) << "=" << escape(std::to_string(FORMAT_VERSION)) << "\n";
+
+  for (const auto& kv : storage_copy) {
+    ofs << escape(kv.first) << "=" << escape(kv.second) << "\n";
+  }
+  return true;
+}
+
+bool PersistentMapStateStorage::load() {
+  std::lock_guard<std::mutex> lock(mutex_);
+  std::ifstream ifs(file_);
+  if (!ifs.is_open()) {
+    logger_->log_debug("Failed to open file \"%s\" to load state", file_.c_str());
+    return false;
+  }
+  std::unordered_map<std::string, std::string> map;
+  std::string line;
+  while (std::getline(ifs, line)) {
+    std::string key;
+    std::string value;
+    if (!parseLine(line, key, value)) {
+      continue;
+    }
+    if (key == FORMAT_VERSION_KEY) {
+      int format_version = 0;
+      try {
+        format_version = std::stoi(value);
+      } catch (...) {
+        logger_->log_error(R"(Invalid format version number found in "%s": "%s")", file_.c_str(), value.c_str());
+        return false;
+      }
+      if (format_version > FORMAT_VERSION) {
+        logger_->log_error("\"%s\" has been serialized with a larger format version than currently known: %d > %d", file_.c_str(), format_version, FORMAT_VERSION);
+        return false;
+      }
+    } else {
+        map[key] = value;
+    }
+  }
+
+  storage_ = InMemoryKeyValueStorage{std::move(map)};
+  logger_->log_debug("Loaded state from \"%s\"", file_.c_str());
+  return true;
+}
+
+REGISTER_RESOURCE_AS(PersistentMapStateStorage, ControllerService, ("UnorderedMapPersistableKeyValueStoreService", "PersistentMapStateStorage"));
+
+}  // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.h b/extensions/standard-processors/controllers/PersistentMapStateStorage.h
similarity index 63%
rename from extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.h
rename to extensions/standard-processors/controllers/PersistentMapStateStorage.h
index d0675c2..8d44964 100644
--- a/extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.h
+++ b/extensions/standard-processors/controllers/PersistentMapStateStorage.h
@@ -22,37 +22,36 @@
 #include <memory>
 #include <utility>
 
-#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
-#include "UnorderedMapKeyValueStoreService.h"
+#include "controllers/keyvalue/AutoPersistor.h"
 #include "core/Core.h"
 #include "properties/Configure.h"
+#include "InMemoryKeyValueStorage.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
 
 namespace org::apache::nifi::minifi::controllers {
 
-class UnorderedMapPersistableKeyValueStoreService : public AbstractAutoPersistingKeyValueStoreService,
-                                                    public UnorderedMapKeyValueStoreService {
+class PersistentMapStateStorage : public KeyValueStateStorage {
  public:
-  explicit UnorderedMapPersistableKeyValueStoreService(std::string name, const utils::Identifier& uuid = {});
-  explicit UnorderedMapPersistableKeyValueStoreService(std::string name, const std::shared_ptr<Configure>& configuration);
+  explicit PersistentMapStateStorage(const std::string& name, const utils::Identifier& uuid = {});
+  explicit PersistentMapStateStorage(const std::string& name, const std::shared_ptr<Configure>& configuration);
 
-  ~UnorderedMapPersistableKeyValueStoreService() override;
+  ~PersistentMapStateStorage() override;
 
-  EXTENSIONAPI static constexpr const char* Description = "A persistable key-value service implemented by a locked std::unordered_map<std::string, std::string> and persisted into a file";
-  EXTENSIONAPI static const core::Property LinkedServices;
+  EXTENSIONAPI static constexpr const char* Description = "A persistable state storage service implemented by a locked std::unordered_map<std::string, std::string> and persisted into a file";
+
   EXTENSIONAPI static const core::Property AlwaysPersist;
   EXTENSIONAPI static const core::Property AutoPersistenceInterval;
-  EXTENSIONAPI static const core::Property Directory;
   EXTENSIONAPI static const core::Property File;
   static auto properties() {
     return std::array{
-      LinkedServices,
       AlwaysPersist,
       AutoPersistenceInterval,
       File
     };
   }
+
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES
 
@@ -61,39 +60,31 @@
   void notifyStop() override;
 
   bool set(const std::string& key, const std::string& value) override;
-
+  bool get(const std::string& key, std::string& value) override;
+  bool get(std::unordered_map<std::string, std::string>& kvs) override;
   bool remove(const std::string& key) override;
-
   bool clear() override;
-
   bool update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) override;
 
   bool persist() override {
     return persistNonVirtual();
   }
 
- protected:
-  using AbstractAutoPersistingKeyValueStoreService::getImpl;
-  using AbstractAutoPersistingKeyValueStoreService::setImpl;
-  using AbstractAutoPersistingKeyValueStoreService::persistImpl;
-  using AbstractAutoPersistingKeyValueStoreService::removeImpl;
-
-
+ private:
   static constexpr const char* FORMAT_VERSION_KEY = "__UnorderedMapPersistableKeyValueStoreService_FormatVersion";
   static constexpr int FORMAT_VERSION = 1;
 
-  std::string file_;
-
   bool load();
-
   bool parseLine(const std::string& line, std::string& key, std::string& value);
 
- private:
+  // non-virtual to allow calling in destructor
   bool persistNonVirtual();
 
-  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<UnorderedMapPersistableKeyValueStoreService>::getLogger();
+  std::mutex mutex_;
+  std::string file_;
+  InMemoryKeyValueStorage storage_;
+  AutoPersistor auto_persistor_;
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PersistentMapStateStorage>::getLogger();
 };
 
-static_assert(std::is_convertible<UnorderedMapKeyValueStoreService*, PersistableKeyValueStoreService*>::value, "UnorderedMapKeyValueStoreService is a PersistableKeyValueStoreService");
-
 }  // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.cpp b/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.cpp
deleted file mode 100644
index ffbd1b2..0000000
--- a/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.cpp
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "UnorderedMapKeyValueStoreService.h"
-#include "core/PropertyBuilder.h"
-#include "core/Resource.h"
-
-namespace org::apache::nifi::minifi::controllers {
-
-const core::Property UnorderedMapKeyValueStoreService::LinkedServices(
-    core::PropertyBuilder::createProperty("Linked Services")
-    ->withDescription("Referenced Controller Services")
-    ->build());
-
-UnorderedMapKeyValueStoreService::UnorderedMapKeyValueStoreService(std::string name, const utils::Identifier& uuid /*= utils::Identifier()*/)
-    : PersistableKeyValueStoreService(std::move(name), uuid) {
-}
-
-UnorderedMapKeyValueStoreService::UnorderedMapKeyValueStoreService(std::string name, const std::shared_ptr<Configure> &configuration)
-    : PersistableKeyValueStoreService(std::move(name)) {
-  setConfiguration(configuration);
-}
-
-UnorderedMapKeyValueStoreService::~UnorderedMapKeyValueStoreService() = default;
-
-bool UnorderedMapKeyValueStoreService::set(const std::string& key, const std::string& value) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
-  map_[key] = value;
-  return true;
-}
-
-bool UnorderedMapKeyValueStoreService::get(const std::string& key, std::string& value) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
-  auto it = map_.find(key);
-  if (it == map_.end()) {
-    return false;
-  } else {
-    value = it->second;
-    return true;
-  }
-}
-
-bool UnorderedMapKeyValueStoreService::get(std::unordered_map<std::string, std::string>& kvs) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
-  kvs = map_;
-  return true;
-}
-
-bool UnorderedMapKeyValueStoreService::remove(const std::string& key) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
-  return map_.erase(key) == 1U;
-}
-
-bool UnorderedMapKeyValueStoreService::clear() {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
-  map_.clear();
-  return true;
-}
-
-void UnorderedMapKeyValueStoreService::initialize() {
-  ControllerService::initialize();
-  setSupportedProperties(properties());
-}
-
-bool UnorderedMapKeyValueStoreService::update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
-  bool exists = false;
-  std::string value;
-  auto it = map_.find(key);
-  if (it != map_.end()) {
-    exists = true;
-    value = it->second;
-  }
-  try {
-    if (!update_func(exists, value)) {
-      return false;
-    }
-  } catch (const std::exception& e) {
-    logger_->log_error("update_func failed with an exception: %s", e.what());
-    return false;
-  } catch (...) {
-    logger_->log_error("update_func failed with an exception");
-    return false;
-  }
-  if (!exists) {
-    it = map_.emplace(key, "").first;
-  }
-  it->second = std::move(value);
-  return true;
-}
-
-REGISTER_RESOURCE(UnorderedMapKeyValueStoreService, ControllerService);
-
-}  // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.cpp b/extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.cpp
deleted file mode 100644
index 309a53a..0000000
--- a/extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.cpp
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "UnorderedMapPersistableKeyValueStoreService.h"
-
-#include <fstream>
-#include <set>
-
-#include "utils/file/FileUtils.h"
-#include "utils/StringUtils.h"
-#include "core/PropertyBuilder.h"
-#include "core/Resource.h"
-
-namespace {
-  std::string escape(const std::string& str) {
-    std::stringstream escaped;
-    for (const auto c : str) {
-      switch (c) {
-        case '\\':
-          escaped << "\\\\";
-          break;
-        case '\n':
-          escaped << "\\n";
-          break;
-        case '=':
-          escaped << "\\=";
-          break;
-        default:
-          escaped << c;
-          break;
-      }
-    }
-    return escaped.str();
-  }
-}  // namespace
-
-namespace org::apache::nifi::minifi::controllers {
-
-const core::Property UnorderedMapPersistableKeyValueStoreService::LinkedServices(
-    core::PropertyBuilder::createProperty("Linked Services")
-    ->withDescription("Referenced Controller Services")
-    ->build());
-const core::Property UnorderedMapPersistableKeyValueStoreService::AlwaysPersist(
-    core::PropertyBuilder::createProperty(AbstractAutoPersistingKeyValueStoreService::AlwaysPersistPropertyName)
-    ->withDescription("Persist every change instead of persisting it periodically.")
-    ->isRequired(false)
-    ->withDefaultValue<bool>(false)
-    ->build());
-const core::Property UnorderedMapPersistableKeyValueStoreService::AutoPersistenceInterval(
-    core::PropertyBuilder::createProperty(AbstractAutoPersistingKeyValueStoreService::AutoPersistenceIntervalPropertyName)
-    ->withDescription("The interval of the periodic task persisting all values. Only used if Always Persist is false. If set to 0 seconds, auto persistence will be disabled.")
-    ->isRequired(false)
-    ->withDefaultValue<core::TimePeriodValue>("1 min")
-    ->build());
-const core::Property UnorderedMapPersistableKeyValueStoreService::File(
-    core::PropertyBuilder::createProperty("File")
-    ->withDescription("Path to a file to store state")
-    ->isRequired(true)
-    ->build());
-
-UnorderedMapPersistableKeyValueStoreService::UnorderedMapPersistableKeyValueStoreService(std::string name, const utils::Identifier& uuid /*= utils::Identifier()*/)
-    : PersistableKeyValueStoreService(name, uuid)
-    , AbstractAutoPersistingKeyValueStoreService(name, uuid)
-    , UnorderedMapKeyValueStoreService(std::move(name), uuid) {
-}
-
-UnorderedMapPersistableKeyValueStoreService::UnorderedMapPersistableKeyValueStoreService(std::string name, const std::shared_ptr<Configure> &configuration)
-    : PersistableKeyValueStoreService(name)
-    , AbstractAutoPersistingKeyValueStoreService(name)
-    , UnorderedMapKeyValueStoreService(std::move(name)) {
-  setConfiguration(configuration);
-}
-
-UnorderedMapPersistableKeyValueStoreService::~UnorderedMapPersistableKeyValueStoreService() {
-  persistNonVirtual();
-}
-
-bool UnorderedMapPersistableKeyValueStoreService::parseLine(const std::string& line, std::string& key, std::string& value) {
-  std::stringstream key_ss;
-  std::stringstream value_ss;
-  bool in_escape_sequence = false;
-  bool key_complete = false;
-  for (const auto c : line) {
-    auto& current = key_complete ? value_ss : key_ss;
-    if (in_escape_sequence) {
-      switch (c) {
-        case '\\':
-          current << '\\';
-          break;
-        case 'n':
-          current << '\n';
-          break;
-        case '=':
-          current << '=';
-          break;
-        default:
-          logger_->log_error(R"(Invalid escape sequence in "%s": "\%c")", line.c_str(), c);
-          return false;
-      }
-      in_escape_sequence = false;
-    } else {
-      if (c == '\\') {
-        in_escape_sequence = true;
-      } else if (c == '=') {
-        if (key_complete) {
-          logger_->log_error(R"(Unterminated '=' in line "%s")", line.c_str());
-          return false;
-        } else {
-          key_complete = true;
-        }
-      } else {
-        current << c;
-      }
-    }
-  }
-  if (in_escape_sequence) {
-    logger_->log_error("Unterminated escape sequence in \"%s\"", line.c_str());
-    return false;
-  }
-  if (!key_complete) {
-    logger_->log_error("Key not found in \"%s\"", line.c_str());
-    return false;
-  }
-  key = key_ss.str();
-  if (key.empty()) {
-    logger_->log_error(R"(Line with empty key found in "%s": "%s")", file_.c_str(), line.c_str());
-    return false;
-  }
-  value = value_ss.str();
-  return true;
-}
-
-void UnorderedMapPersistableKeyValueStoreService::initialize() {
-  // UnorderedMapKeyValueStoreService::initialize() also calls setSupportedProperties, and we don't want that
-  ControllerService::initialize();  // NOLINT(bugprone-parent-virtual-call)
-  setSupportedProperties(properties());
-}
-
-void UnorderedMapPersistableKeyValueStoreService::onEnable() {
-  if (configuration_ == nullptr) {
-    logger_->log_debug("Cannot enable UnorderedMapPersistableKeyValueStoreService");
-    return;
-  }
-
-  if (!getProperty(File.getName(), file_)) {
-    logger_->log_error("Invalid or missing property: File");
-    return;
-  }
-
-  /* We must not start the persistence thread until we attempted to load the state */
-  load();
-
-  AbstractAutoPersistingKeyValueStoreService::onEnable();
-
-  logger_->log_trace("Enabled UnorderedMapPersistableKeyValueStoreService");
-}
-
-void UnorderedMapPersistableKeyValueStoreService::notifyStop() {
-  AbstractAutoPersistingKeyValueStoreService::notifyStop();
-  persist();
-}
-
-bool UnorderedMapPersistableKeyValueStoreService::set(const std::string& key, const std::string& value) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
-  bool res = UnorderedMapKeyValueStoreService::set(key, value);
-  if (always_persist_ && res) {
-    return persist();
-  }
-  return res;
-}
-
-bool UnorderedMapPersistableKeyValueStoreService::remove(const std::string& key) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
-  bool res = UnorderedMapKeyValueStoreService::remove(key);
-  if (always_persist_ && res) {
-    return persist();
-  }
-  return res;
-}
-
-bool UnorderedMapPersistableKeyValueStoreService::clear() {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
-  bool res = UnorderedMapKeyValueStoreService::clear();
-  if (always_persist_ && res) {
-    return persist();
-  }
-  return res;
-}
-
-bool UnorderedMapPersistableKeyValueStoreService::update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
-  bool res = UnorderedMapKeyValueStoreService::update(key, update_func);
-  if (always_persist_ && res) {
-    return persist();
-  }
-  return res;
-}
-
-bool UnorderedMapPersistableKeyValueStoreService::persistNonVirtual() {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
-  std::ofstream ofs(file_);
-  if (!ofs.is_open()) {
-    logger_->log_error("Failed to open file \"%s\" to store state", file_.c_str());
-    return false;
-  }
-  ofs << escape(FORMAT_VERSION_KEY) << "=" << escape(std::to_string(FORMAT_VERSION)) << "\n";
-  for (const auto& kv : map_) {
-    ofs << escape(kv.first) << "=" << escape(kv.second) << "\n";
-  }
-  return true;
-}
-
-bool UnorderedMapPersistableKeyValueStoreService::load() {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
-  std::ifstream ifs(file_);
-  if (!ifs.is_open()) {
-    logger_->log_debug("Failed to open file \"%s\" to load state", file_.c_str());
-    return false;
-  }
-  std::unordered_map<std::string, std::string> map;
-  std::string line;
-  while (std::getline(ifs, line)) {
-    std::string key;
-    std::string value;
-    if (!parseLine(line, key, value)) {
-      continue;
-    }
-    if (key == FORMAT_VERSION_KEY) {
-      int format_version = 0;
-      try {
-        format_version = std::stoi(value);
-      } catch (...) {
-        logger_->log_error(R"(Invalid format version number found in "%s": "%s")", file_.c_str(), value.c_str());
-        return false;
-      }
-      if (format_version > FORMAT_VERSION) {
-        logger_->log_error("\"%s\" has been serialized with a larger format version than currently known: %d > %d", file_.c_str(), format_version, FORMAT_VERSION);
-        return false;
-      }
-    } else {
-        map[key] = value;
-    }
-  }
-  map_ = std::move(map);
-  logger_->log_debug("Loaded state from \"%s\"", file_.c_str());
-  return true;
-}
-
-REGISTER_RESOURCE(UnorderedMapPersistableKeyValueStoreService, ControllerService);
-
-}  // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/standard-processors/controllers/VolatileMapStateStorage.cpp b/extensions/standard-processors/controllers/VolatileMapStateStorage.cpp
new file mode 100644
index 0000000..40608cf
--- /dev/null
+++ b/extensions/standard-processors/controllers/VolatileMapStateStorage.cpp
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "VolatileMapStateStorage.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::controllers {
+
+const core::Property VolatileMapStateStorage::LinkedServices(
+    core::PropertyBuilder::createProperty("Linked Services")
+    ->withDescription("Referenced Controller Services")
+    ->build());
+
+VolatileMapStateStorage::VolatileMapStateStorage(const std::string& name, const utils::Identifier& uuid /*= utils::Identifier()*/)
+    : KeyValueStateStorage(name, uuid) {
+}
+
+VolatileMapStateStorage::VolatileMapStateStorage(const std::string& name, const std::shared_ptr<Configure> &configuration)
+    : KeyValueStateStorage(name) {
+  setConfiguration(configuration);
+}
+
+void VolatileMapStateStorage::initialize() {
+  ControllerService::initialize();
+  setSupportedProperties(properties());
+}
+
+bool VolatileMapStateStorage::set(const std::string& key, const std::string& value) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  return storage_.set(key, value);
+}
+
+bool VolatileMapStateStorage::get(const std::string& key, std::string& value) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  return storage_.get(key, value);
+}
+
+bool VolatileMapStateStorage::get(std::unordered_map<std::string, std::string>& kvs) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  return storage_.get(kvs);
+}
+
+bool VolatileMapStateStorage::remove(const std::string& key) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  return storage_.remove(key);
+}
+
+bool VolatileMapStateStorage::clear() {
+  std::lock_guard<std::mutex> lock(mutex_);
+  return storage_.clear();
+}
+
+bool VolatileMapStateStorage::update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  return storage_.update(key, update_func);
+}
+
+REGISTER_RESOURCE_AS(VolatileMapStateStorage, ControllerService, ("UnorderedMapKeyValueStoreService", "VolatileMapStateStorage"));
+
+}  // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.h b/extensions/standard-processors/controllers/VolatileMapStateStorage.h
similarity index 75%
rename from extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.h
rename to extensions/standard-processors/controllers/VolatileMapStateStorage.h
index baa898d..952f858 100644
--- a/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.h
+++ b/extensions/standard-processors/controllers/VolatileMapStateStorage.h
@@ -22,22 +22,20 @@
 #include <memory>
 #include <utility>
 
-#include "controllers/keyvalue/KeyValueStoreService.h"
 #include "core/Core.h"
 #include "properties/Configure.h"
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
-#include "controllers/keyvalue/PersistableKeyValueStoreService.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
+#include "InMemoryKeyValueStorage.h"
 
 namespace org::apache::nifi::minifi::controllers {
 
-/// Key-value store service purely in RAM without disk usage
-class UnorderedMapKeyValueStoreService : virtual public PersistableKeyValueStoreService {
+/// Key-value state storage purely in RAM without disk usage
+class VolatileMapStateStorage : virtual public KeyValueStateStorage {
  public:
-  explicit UnorderedMapKeyValueStoreService(std::string name, const utils::Identifier& uuid = {});
-  explicit UnorderedMapKeyValueStoreService(std::string name, const std::shared_ptr<Configure>& configuration);
-
-  ~UnorderedMapKeyValueStoreService() override;
+  explicit VolatileMapStateStorage(const std::string& name, const utils::Identifier& uuid = {});
+  explicit VolatileMapStateStorage(const std::string& name, const std::shared_ptr<Configure>& configuration);
 
   EXTENSIONAPI static constexpr const char* Description = "A key-value service implemented by a locked std::unordered_map<std::string, std::string>";
   EXTENSIONAPI static const core::Property LinkedServices;
@@ -45,23 +43,23 @@
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES
 
+  void initialize() override;
+
   bool set(const std::string& key, const std::string& value) override;
   bool get(const std::string& key, std::string& value) override;
   bool get(std::unordered_map<std::string, std::string>& kvs) override;
   bool remove(const std::string& key) override;
   bool clear() override;
-  void initialize() override;
   bool update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) override;
+
   bool persist() override {
     return true;
   }
 
- protected:
-  std::unordered_map<std::string, std::string> map_;
-  std::recursive_mutex mutex_;
-
  private:
-  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<UnorderedMapKeyValueStoreService>::getLogger();
+  std::mutex mutex_;
+  InMemoryKeyValueStorage storage_;
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<VolatileMapStateStorage>::getLogger();
 };
 
 }  // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/standard-processors/processors/TailFile.h b/extensions/standard-processors/processors/TailFile.h
index 49297c9..286ec6b 100644
--- a/extensions/standard-processors/processors/TailFile.h
+++ b/extensions/standard-processors/processors/TailFile.h
@@ -201,7 +201,7 @@
   static const int BUFFER_SIZE = 512;
 
   std::string delimiter_;  // Delimiter for the data incoming from the tailed file.
-  core::CoreComponentStateManager* state_manager_ = nullptr;
+  core::StateManager* state_manager_ = nullptr;
   std::map<std::filesystem::path, TailState> tail_states_;
   Mode tail_mode_ = Mode::UNDEFINED;
   std::optional<utils::Regex> pattern_regex_;
diff --git a/extensions/systemd/ConsumeJournald.h b/extensions/systemd/ConsumeJournald.h
index 25a7015..cebb315 100644
--- a/extensions/systemd/ConsumeJournald.h
+++ b/extensions/systemd/ConsumeJournald.h
@@ -29,7 +29,7 @@
 #include <utility>
 #include <vector>
 
-#include "core/CoreComponentState.h"
+#include "core/StateManager.h"
 #include "core/Processor.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "libwrapper/LibWrapper.h"
@@ -112,7 +112,7 @@
 
   std::atomic<bool> running_{false};
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ConsumeJournald>::getLogger(uuid_);
-  core::CoreComponentStateManager* state_manager_;
+  core::StateManager* state_manager_;
   std::unique_ptr<libwrapper::LibWrapper> libwrapper_;
   std::unique_ptr<utils::FifoExecutor> worker_;
   std::unique_ptr<libwrapper::Journal> journal_;
diff --git a/extensions/windows-event-log/Bookmark.cpp b/extensions/windows-event-log/Bookmark.cpp
index 300aef3..74ea223 100644
--- a/extensions/windows-event-log/Bookmark.cpp
+++ b/extensions/windows-event-log/Bookmark.cpp
@@ -35,7 +35,7 @@
     const std::filesystem::path& bookmarkRootDir,
     const utils::Identifier& uuid,
     bool processOldEvents,
-    core::CoreComponentStateManager* state_manager,
+    core::StateManager* state_manager,
     std::shared_ptr<core::logging::Logger> logger)
     : logger_(std::move(logger)),
       state_manager_(state_manager) {
diff --git a/extensions/windows-event-log/Bookmark.h b/extensions/windows-event-log/Bookmark.h
index 66937df..c2f07fb 100644
--- a/extensions/windows-event-log/Bookmark.h
+++ b/extensions/windows-event-log/Bookmark.h
@@ -41,7 +41,7 @@
       const std::filesystem::path& bookmarkRootDir,
       const utils::Identifier& uuid,
       bool processOldEvents,
-      core::CoreComponentStateManager* state_manager,
+      core::StateManager* state_manager,
       std::shared_ptr<core::logging::Logger> logger);
   ~Bookmark();
   explicit operator bool() const noexcept;
@@ -57,7 +57,7 @@
   using unique_evt_handle = wel::unique_evt_handle;
 
   std::shared_ptr<core::logging::Logger> logger_;
-  core::CoreComponentStateManager* state_manager_;
+  core::StateManager* state_manager_;
   std::filesystem::path filePath_;
   bool ok_{};
   unique_evt_handle hBookmark_;
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.h b/extensions/windows-event-log/ConsumeWindowsEventLog.h
index e7530d6..ef43020 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.h
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.h
@@ -146,7 +146,7 @@
                                                     const EVT_HANDLE& event_query_results);
 
   std::shared_ptr<core::logging::Logger> logger_;
-  core::CoreComponentStateManager* state_manager_{nullptr};
+  core::StateManager* state_manager_{nullptr};
   wel::METADATA_NAMES header_names_;
   std::optional<std::string> header_delimiter_;
   std::string channel_;
diff --git a/extensions/windows-event-log/tests/BookmarkTests.cpp b/extensions/windows-event-log/tests/BookmarkTests.cpp
index eeb2c7b..139455d 100644
--- a/extensions/windows-event-log/tests/BookmarkTests.cpp
+++ b/extensions/windows-event-log/tests/BookmarkTests.cpp
@@ -40,7 +40,7 @@
 std::unique_ptr<Bookmark> createBookmark(TestPlan &test_plan,
                                          const std::wstring &channel,
                                          const utils::Identifier &uuid,
-                                         core::CoreComponentStateManager* state_manager) {
+                                         core::StateManager* state_manager) {
   const auto logger = test_plan.getLogger();
   return std::make_unique<Bookmark>(channel, L"*", "", uuid, false, state_manager, logger);
 }
@@ -103,7 +103,7 @@
 
 
   const utils::Identifier uuid = IdGenerator::getIdGenerator()->generate();
-  auto state_manager = test_plan->getStateManagerProvider()->getCoreComponentStateManager(uuid);
+  auto state_manager = test_plan->getStateStorage()->getStateManager(uuid);
   std::unique_ptr<Bookmark> bookmark = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid, state_manager.get());
   REQUIRE(bookmark);
   REQUIRE(*bookmark);
@@ -120,7 +120,7 @@
   LogTestController::getInstance().setTrace<TestPlan>();
 
   utils::Identifier uuid = IdGenerator::getIdGenerator()->generate();
-  auto state_manager = test_plan->getStateManagerProvider()->getCoreComponentStateManager(uuid);
+  auto state_manager = test_plan->getStateStorage()->getStateManager(uuid);
   std::unique_ptr<Bookmark> bookmark_before = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid, state_manager.get());
   std::wstring bookmark_xml_before = bookmarkAsXml(bookmark_before);
 
@@ -139,14 +139,14 @@
   LogTestController::getInstance().setTrace<TestPlan>();
 
   utils::Identifier uuid_one = IdGenerator::getIdGenerator()->generate();
-  auto state_manager_one = test_plan->getStateManagerProvider()->getCoreComponentStateManager(uuid_one);
+  auto state_manager_one = test_plan->getStateStorage()->getStateManager(uuid_one);
   std::unique_ptr<Bookmark> bookmark_before = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid_one, state_manager_one.get());
 
   reportEvent(APPLICATION_CHANNEL, "Something interesting happened");
 
   utils::Identifier uuid_two = IdGenerator::getIdGenerator()->generate();
   // different uuid, so we get a new, empty, state manager
-  auto state_manager_two = test_plan->getStateManagerProvider()->getCoreComponentStateManager(uuid_two);
+  auto state_manager_two = test_plan->getStateStorage()->getStateManager(uuid_two);
   std::unique_ptr<Bookmark> bookmark_after = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid_two, state_manager_two.get());
 
   REQUIRE(bookmarkAsXml(bookmark_before) != bookmarkAsXml(bookmark_after));
@@ -158,7 +158,7 @@
   LogTestController::getInstance().setTrace<TestPlan>();
 
   utils::Identifier uuid = IdGenerator::getIdGenerator()->generate();
-  auto state_manager = test_plan->getStateManagerProvider()->getCoreComponentStateManager(uuid);
+  auto state_manager = test_plan->getStateStorage()->getStateManager(uuid);
   std::unique_ptr<Bookmark> bookmark_one = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid, state_manager.get());
   std::unique_ptr<Bookmark> bookmark_two = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid, state_manager.get());
 
@@ -174,14 +174,14 @@
 
   GIVEN("We have two different bookmarks") {
     const auto uuid = IdGenerator::getIdGenerator()->generate();
-    auto state_manager = test_plan->getStateManagerProvider()->getCoreComponentStateManager(uuid);
+    auto state_manager = test_plan->getStateStorage()->getStateManager(uuid);
     std::unique_ptr<Bookmark> bookmark_one = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid, state_manager.get());
     std::wstring bookmark_one_xml = bookmarkAsXml(bookmark_one);
 
     reportEvent(APPLICATION_CHANNEL, "Something interesting happened");
 
     const utils::Identifier uuid_two = IdGenerator::getIdGenerator()->generate();
-    auto state_manager_two = test_plan->getStateManagerProvider()->getCoreComponentStateManager(uuid_two);
+    auto state_manager_two = test_plan->getStateStorage()->getStateManager(uuid_two);
     std::unique_ptr<Bookmark> bookmark_two = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid_two, state_manager_two.get());
     std::wstring bookmark_two_xml = bookmarkAsXml(bookmark_two);
 
@@ -208,7 +208,7 @@
   LogTestController::getInstance().setTrace<TestPlan>();
 
   const utils::Identifier uuid = IdGenerator::getIdGenerator()->generate();
-  auto state_manager = test_plan->getStateManagerProvider()->getCoreComponentStateManager(uuid);
+  auto state_manager = test_plan->getStateStorage()->getStateManager(uuid);
   std::unique_ptr<Bookmark> bookmark = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid, state_manager.get());
   std::wstring bookmark_xml_before = bookmarkAsXml(bookmark);
 
@@ -230,13 +230,13 @@
 
   GIVEN("We have two different bookmarks with two different state managers") {
     utils::Identifier uuid_one = IdGenerator::getIdGenerator()->generate();
-    auto state_manager_one = test_plan->getStateManagerProvider()->getCoreComponentStateManager(uuid_one);
+    auto state_manager_one = test_plan->getStateStorage()->getStateManager(uuid_one);
     std::unique_ptr<Bookmark> bookmark_one = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid_one, state_manager_one.get());
 
     reportEvent(APPLICATION_CHANNEL, "Something interesting happened");
 
     utils::Identifier uuid_two = IdGenerator::getIdGenerator()->generate();
-    auto state_manager_two = test_plan->getStateManagerProvider()->getCoreComponentStateManager(uuid_two);
+    auto state_manager_two = test_plan->getStateStorage()->getStateManager(uuid_two);
     std::unique_ptr<Bookmark> bookmark_two = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid_two, state_manager_two.get());
 
     REQUIRE(bookmarkAsXml(bookmark_one) != bookmarkAsXml(bookmark_two));
diff --git a/libminifi/include/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h b/libminifi/include/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h
deleted file mode 100644
index 5a725a2..0000000
--- a/libminifi/include/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_ABSTRACTCORECOMPONENTSTATEMANAGERPROVIDER_H_
-#define LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_ABSTRACTCORECOMPONENTSTATEMANAGERPROVIDER_H_
-
-#include <unordered_map>
-#include <string>
-#include <memory>
-#include <map>
-
-#include "core/Core.h"
-#include "core/CoreComponentState.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace controllers {
-
-class AbstractCoreComponentStateManagerProvider : public core::CoreComponentStateManagerProvider {
- public:
-  std::unique_ptr<core::CoreComponentStateManager> getCoreComponentStateManager(const utils::Identifier& uuid) override;
-
-  std::map<utils::Identifier, std::unordered_map<std::string, std::string>> getAllCoreComponentStates() override;
-
-  class AbstractCoreComponentStateManager : public core::CoreComponentStateManager {
-   public:
-    AbstractCoreComponentStateManager(AbstractCoreComponentStateManagerProvider* provider, const utils::Identifier& id);
-
-    bool set(const core::CoreComponentState& kvs) override;
-    bool get(core::CoreComponentState& kvs) override;
-    bool clear() override;
-    bool persist() override;
-
-    bool isTransactionInProgress() const override;
-    bool beginTransaction() override;
-    bool commit() override;
-    bool rollback() override;
-
-   private:
-    enum class ChangeType {
-      NONE,
-      SET,
-      CLEAR
-    };
-
-    AbstractCoreComponentStateManagerProvider* provider_;
-    utils::Identifier id_;
-    bool state_valid_;
-    core::CoreComponentState state_;
-    bool transaction_in_progress_;
-    ChangeType change_type_;
-    core::CoreComponentState state_to_set_;
-  };
-
- protected:
-  virtual bool setImpl(const utils::Identifier& key, const std::string& serialized_state) = 0;
-  virtual bool getImpl(const utils::Identifier& key, std::string& serialized_state) = 0;
-  virtual bool getImpl(std::map<utils::Identifier, std::string>& kvs) = 0;
-  virtual bool removeImpl(const utils::Identifier& key) = 0;
-  virtual bool persistImpl() = 0;
-
- private:
-  std::mutex mutex_;
-};
-
-}  // namespace controllers
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_ABSTRACTCORECOMPONENTSTATEMANAGERPROVIDER_H_
diff --git a/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h b/libminifi/include/controllers/keyvalue/AutoPersistor.h
similarity index 61%
rename from libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h
rename to libminifi/include/controllers/keyvalue/AutoPersistor.h
index 35a87bb..3cefe8c 100644
--- a/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h
+++ b/libminifi/include/controllers/keyvalue/AutoPersistor.h
@@ -16,47 +16,46 @@
  */
 #pragma once
 
+#include <condition_variable>
+#include <memory>
+#include <mutex>
 #include <string>
 #include <thread>
-#include <mutex>
-#include <memory>
 #include <utility>
 
-#include "PersistableKeyValueStoreService.h"
+#include "core/ConfigurableComponent.h"
 #include "core/Core.h"
-#include "properties/Configure.h"
-#include "core/logging/Logger.h"
 #include "core/logging/LoggerFactory.h"
 #include "utils/Export.h"
 
 namespace org::apache::nifi::minifi::controllers {
 
-class AbstractAutoPersistingKeyValueStoreService : virtual public PersistableKeyValueStoreService {
+/**
+ *  Persists in given intervals.
+ *  Has an own thread, so stop() must be called before destruction of data used by persist_.
+ */
+class AutoPersistor {
  public:
-  explicit AbstractAutoPersistingKeyValueStoreService(std::string name, const utils::Identifier& uuid = {});
+  ~AutoPersistor();
 
-  ~AbstractAutoPersistingKeyValueStoreService() override;
+  void start(bool always_persist, std::chrono::milliseconds auto_persistence_interval, std::function<bool()> persist);
+  void stop();
 
-  static constexpr const char* AlwaysPersistPropertyName = "Always Persist";
-  static constexpr const char* AutoPersistenceIntervalPropertyName = "Auto Persistence Interval";
-
-  void onEnable() override;
-  void notifyStop() override;
-
- protected:
-  bool always_persist_;
-  std::chrono::milliseconds auto_persistence_interval_;
-
-  std::thread persisting_thread_;
-  bool running_;
-  std::mutex persisting_mutex_;
-  std::condition_variable persisting_cv_;
-  void persistingThreadFunc();
+  [[nodiscard]] bool isAlwaysPersisting() const {
+    return always_persist_;
+  }
 
  private:
-  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<AbstractAutoPersistingKeyValueStoreService>::getLogger();
+  void persistingThreadFunc();
 
-  void stopPersistingThread();
+  bool always_persist_ = false;
+  std::chrono::milliseconds auto_persistence_interval_{0};
+  std::thread persisting_thread_;
+  bool running_ = false;
+  std::mutex persisting_mutex_;
+  std::condition_variable persisting_cv_;
+  std::function<bool()> persist_;
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<AutoPersistor>::getLogger();
 };
 
 }  // namespace org::apache::nifi::minifi::controllers
diff --git a/libminifi/include/controllers/keyvalue/KeyValueStateManager.h b/libminifi/include/controllers/keyvalue/KeyValueStateManager.h
new file mode 100644
index 0000000..19df889
--- /dev/null
+++ b/libminifi/include/controllers/keyvalue/KeyValueStateManager.h
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include "core/StateManager.h"
+#include "core/Core.h"
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::controllers {
+
+class KeyValueStateStorage;
+
+class KeyValueStateManager final : public core::StateManager {
+ public:
+  KeyValueStateManager(const utils::Identifier& id, gsl::not_null<KeyValueStateStorage*> storage);
+
+  bool set(const core::StateManager::State& kvs) override;
+  bool get(core::StateManager::State& kvs) override;
+  bool clear() override;
+  bool persist() override;
+
+  [[nodiscard]] bool isTransactionInProgress() const override;
+  bool beginTransaction() override;
+  bool commit() override;
+  bool rollback() override;
+
+ private:
+  enum class ChangeType {
+    NONE,
+    SET,
+    CLEAR
+  };
+
+  gsl::not_null<KeyValueStateStorage*> storage_;
+  std::optional<core::StateManager::State> state_;
+  bool transaction_in_progress_;
+  ChangeType change_type_;
+  core::StateManager::State state_to_set_;
+};
+
+}  // namespace org::apache::nifi::minifi::controllers
diff --git a/libminifi/include/controllers/keyvalue/KeyValueStateStorage.h b/libminifi/include/controllers/keyvalue/KeyValueStateStorage.h
new file mode 100644
index 0000000..c0f2e51
--- /dev/null
+++ b/libminifi/include/controllers/keyvalue/KeyValueStateStorage.h
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include "core/controller/ControllerService.h"
+#include "core/Core.h"
+#include "core/logging/LoggerFactory.h"
+#include "core/StateManager.h"
+#include "core/StateStorage.h"
+
+namespace org::apache::nifi::minifi::controllers {
+
+constexpr const char* ALWAYS_PERSIST_PROPERTY_NAME = "Always Persist";
+constexpr const char* AUTO_PERSISTENCE_INTERVAL_PROPERTY_NAME = "Auto Persistence Interval";
+
+class KeyValueStateStorage : public core::StateStorage, public core::controller::ControllerService {
+ public:
+  explicit KeyValueStateStorage(const std::string& name, const utils::Identifier& uuid = {});
+
+  static core::StateManager::State deserialize(const std::string& serialized);
+  static std::string serialize(const core::StateManager::State& kvs);
+
+  std::unique_ptr<core::StateManager> getStateManager(const utils::Identifier& uuid) override;
+  std::unordered_map<utils::Identifier, core::StateManager::State> getAllStates() override;
+
+  void yield() override {
+  }
+
+  bool isRunning() override {
+    return getState() == core::controller::ControllerServiceState::ENABLED;
+  }
+
+  bool isWorkAvailable() override {
+    return false;
+  }
+
+  virtual bool set(const std::string& key, const std::string& value) = 0;
+  virtual bool get(const std::string& key, std::string& value) = 0;
+  virtual bool get(std::unordered_map<std::string, std::string>& kvs) = 0;
+  virtual bool remove(const std::string& key) = 0;
+  virtual bool clear() = 0;
+  virtual bool update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) = 0;
+  virtual bool persist() = 0;
+
+ private:
+  bool getAll(std::unordered_map<utils::Identifier, std::string>& kvs);
+
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<KeyValueStateStorage>::getLogger();
+};
+
+}  // namespace org::apache::nifi::minifi::controllers
diff --git a/libminifi/include/controllers/keyvalue/KeyValueStoreService.h b/libminifi/include/controllers/keyvalue/KeyValueStoreService.h
deleted file mode 100644
index 489b977..0000000
--- a/libminifi/include/controllers/keyvalue/KeyValueStoreService.h
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_KEYVALUESTORESERVICE_H_
-#define LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_KEYVALUESTORESERVICE_H_
-
-#include <unordered_map>
-#include <unordered_set>
-#include <string>
-#include <cstdint>
-#include <functional>
-
-#include "core/Core.h"
-#include "properties/Configure.h"
-#include "core/controller/ControllerService.h"
-
-namespace org::apache::nifi::minifi::controllers {
-
-class KeyValueStoreService : public core::controller::ControllerService {
- public:
-  explicit KeyValueStoreService(std::string name, const utils::Identifier& uuid = {});
-
-  ~KeyValueStoreService() override;
-
-  void yield() override;
-  bool isRunning() override;
-  bool isWorkAvailable() override;
-
-  virtual bool set(const std::string& key, const std::string& value) = 0;
-
-  virtual bool get(const std::string& key, std::string& value) = 0;
-
-  virtual bool get(std::unordered_map<std::string, std::string>& kvs) = 0;
-
-  virtual bool remove(const std::string& key) = 0;
-
-  virtual bool clear() = 0;
-
-  virtual bool update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) = 0;
-};
-
-}  // namespace org::apache::nifi::minifi::controllers
-
-#endif  // LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_KEYVALUESTORESERVICE_H_
diff --git a/libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h b/libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h
deleted file mode 100644
index 80b49f6..0000000
--- a/libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_PERSISTABLEKEYVALUESTORESERVICE_H_
-#define LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_PERSISTABLEKEYVALUESTORESERVICE_H_
-
-#include <string>
-#include <unordered_map>
-#include <map>
-
-#include "KeyValueStoreService.h"
-#include "AbstractCoreComponentStateManagerProvider.h"
-#include "core/Core.h"
-#include "properties/Configure.h"
-
-namespace org::apache::nifi::minifi::controllers {
-
-class PersistableKeyValueStoreService : public KeyValueStoreService, public AbstractCoreComponentStateManagerProvider {
- public:
-  explicit PersistableKeyValueStoreService(std::string name, const utils::Identifier& uuid = {});
-
-  ~PersistableKeyValueStoreService() override;
-
-  virtual bool persist() = 0;
-
- protected:
-  bool setImpl(const utils::Identifier& key, const std::string& serialized_state) override;
-  bool getImpl(const utils::Identifier& key, std::string& serialized_state) override;
-  bool getImpl(std::map<utils::Identifier, std::string>& kvs) override;
-  bool removeImpl(const utils::Identifier& key) override;
-  bool persistImpl() override;
-};
-
-}  // namespace org::apache::nifi::minifi::controllers
-
-#endif  // LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_PERSISTABLEKEYVALUESTORESERVICE_H_
diff --git a/libminifi/include/core/CoreComponentState.h b/libminifi/include/core/CoreComponentState.h
deleted file mode 100644
index a0662a6..0000000
--- a/libminifi/include/core/CoreComponentState.h
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef LIBMINIFI_INCLUDE_CORE_CORECOMPONENTSTATE_H_
-#define LIBMINIFI_INCLUDE_CORE_CORECOMPONENTSTATE_H_
-
-#include "Core.h"
-
-#include <cstdint>
-#include <map>
-#include <memory>
-#include <optional>
-#include <string>
-#include <unordered_map>
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-
-using CoreComponentState = std::unordered_map<std::string, std::string>;
-
-class CoreComponentStateManager {
- public:
-  virtual ~CoreComponentStateManager() = default;
-
-  virtual bool set(const CoreComponentState& kvs) = 0;
-  virtual bool get(CoreComponentState& kvs) = 0;
-
-  std::optional<std::unordered_map<std::string, std::string>> get() {
-    std::unordered_map<std::string, std::string> out;
-    if (get(out)) {
-      return out;
-    } else {
-      return std::nullopt;
-    }
-  }
-
-  virtual bool clear() = 0;
-  virtual bool persist() = 0;
-
-  virtual bool isTransactionInProgress() const = 0;
-  virtual bool beginTransaction() = 0;
-  virtual bool commit() = 0;
-  virtual bool rollback() = 0;
-};
-
-class CoreComponentStateManagerProvider {
- public:
-  virtual ~CoreComponentStateManagerProvider() = default;
-
-  virtual std::unique_ptr<CoreComponentStateManager> getCoreComponentStateManager(const utils::Identifier& uuid) = 0;
-
-  virtual std::unique_ptr<CoreComponentStateManager> getCoreComponentStateManager(const CoreComponent& component) {
-    return getCoreComponentStateManager(component.getUUID());
-  }
-
-  virtual std::map<utils::Identifier, CoreComponentState> getAllCoreComponentStates() = 0;
-};
-
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_CORE_CORECOMPONENTSTATE_H_
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
index 2f4c23e..f008e66 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -34,21 +34,19 @@
 #include "core/controller/ControllerServiceProvider.h"
 #include "core/controller/ControllerServiceLookup.h"
 #include "core/logging/LoggerFactory.h"
-#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
 #include "ProcessorNode.h"
 #include "core/Repository.h"
 #include "core/FlowFile.h"
-#include "core/CoreComponentState.h"
+#include "core/StateStorage.h"
 #include "utils/file/FileUtils.h"
 #include "utils/PropertyErrors.h"
 #include "VariableRegistry.h"
 
 namespace org::apache::nifi::minifi::core {
 
-// ProcessContext Class
 class ProcessContext : public controller::ControllerServiceLookup, public core::VariableRegistry, public std::enable_shared_from_this<VariableRegistry> {
  public:
-  // Constructor
   /*!
    * Create a new process context associated with the processor/controller service/state manager
    */
@@ -63,10 +61,9 @@
         configure_(std::make_shared<minifi::Configure>()),
         initialized_(false) {
     repo_ = repo;
-    state_manager_provider_ = getStateManagerProvider(logger_, controller_service_provider_, nullptr);
+    state_storage_ = getStateStorage(logger_, controller_service_provider_, nullptr);
   }
 
-  // Constructor
   /*!
    * Create a new process context associated with the processor/controller service/state manager
    */
@@ -82,13 +79,12 @@
         configure_(configuration),
         initialized_(false) {
     repo_ = repo;
-    state_manager_provider_ = getStateManagerProvider(logger_, controller_service_provider_, configuration);
+    state_storage_ = getStateStorage(logger_, controller_service_provider_, configuration);
     if (!configure_) {
       configure_ = std::make_shared<minifi::Configure>();
     }
   }
-  // Destructor
-  virtual ~ProcessContext() = default;
+
   // Get Processor associated with the Process Context
   std::shared_ptr<ProcessorNode> getProcessorNode() const {
     return processor_node_;
@@ -155,17 +151,13 @@
   bool setProperty(const Property& property, std::string value) {
     return setProperty(property.getName(), value);
   }
-  // Whether the relationship is supported
-  bool isSupportedRelationship(Relationship relationship) const {
-    return processor_node_->isSupportedRelationship(relationship);
-  }
 
   // Check whether the relationship is auto terminated
   bool isAutoTerminated(Relationship relationship) const {
     return processor_node_->isAutoTerminated(relationship);
   }
   // Get ProcessContext Maximum Concurrent Tasks
-  uint8_t getMaxConcurrentTasks(void) const {
+  uint8_t getMaxConcurrentTasks() const {
     return processor_node_->getMaxConcurrentTasks();
   }
   // Yield based on the yield period
@@ -246,14 +238,14 @@
       return initialized_;
   }
 
-  static constexpr char const* DefaultStateManagerProviderName = "defaultstatemanagerprovider";
+  static constexpr char const* DefaultStateStorageName = "defaultstatestorage";
 
-  CoreComponentStateManager* getStateManager() {
-    if (state_manager_provider_ == nullptr) {
+  StateManager* getStateManager() {
+    if (state_storage_ == nullptr) {
       return nullptr;
     }
     if (!state_manager_) {
-      state_manager_ = state_manager_provider_->getCoreComponentStateManager(*processor_node_);
+      state_manager_ = state_storage_->getStateManager(*processor_node_);
     }
     return state_manager_.get();
   }
@@ -262,64 +254,64 @@
     return state_manager_ != nullptr;
   }
 
-  static std::shared_ptr<core::CoreComponentStateManagerProvider> getOrCreateDefaultStateManagerProvider(
+  static std::shared_ptr<core::StateStorage> getOrCreateDefaultStateStorage(
       controller::ControllerServiceProvider* controller_service_provider,
       const std::shared_ptr<minifi::Configure>& configuration) {
     static std::mutex mutex;
     std::lock_guard<std::mutex> lock(mutex);
 
     /* See if we have already created a default provider */
-    std::shared_ptr<core::controller::ControllerServiceNode> node = controller_service_provider->getControllerServiceNode(DefaultStateManagerProviderName);
+    std::shared_ptr<core::controller::ControllerServiceNode> node = controller_service_provider->getControllerServiceNode(DefaultStateStorageName);
     if (node != nullptr) {
-      return std::dynamic_pointer_cast<core::CoreComponentStateManagerProvider>(node->getControllerServiceImplementation());
+      return std::dynamic_pointer_cast<core::StateStorage>(node->getControllerServiceImplementation());
     }
 
     /* Try to get configuration options for default provider */
-    std::string always_persist, auto_persistence_interval;
-    configuration->get(Configure::nifi_state_management_provider_local_always_persist, always_persist);
-    configuration->get(Configure::nifi_state_management_provider_local_auto_persistence_interval, auto_persistence_interval);
-    const auto path = configuration->get(Configure::nifi_state_management_provider_local_path);
+    std::string always_persist;
+    configuration->get(Configure::nifi_state_storage_local_always_persist, Configure::nifi_state_storage_local_always_persist_old, always_persist);
+    std::string auto_persistence_interval;
+    configuration->get(Configure::nifi_state_storage_local_auto_persistence_interval, Configure::nifi_state_storage_local_auto_persistence_interval_old, auto_persistence_interval);
 
-    /* Function to help creating a provider */
+    const auto path = configuration->getWithFallback(Configure::nifi_state_storage_local_path, Configure::nifi_state_storage_local_path_old);
+
+    /* Function to help creating a state storage */
     auto create_provider = [&](
         const std::string& type,
         const std::string& longType,
-        const std::unordered_map<std::string, std::string>& extraProperties) -> std::shared_ptr<core::CoreComponentStateManagerProvider> {
-      node = controller_service_provider->createControllerService(type, longType, DefaultStateManagerProviderName, true /*firstTimeAdded*/);
+        const std::unordered_map<std::string, std::string>& extraProperties) -> std::shared_ptr<core::StateStorage> {
+      node = controller_service_provider->createControllerService(type, longType, DefaultStateStorageName, true /*firstTimeAdded*/);
       if (node == nullptr) {
         return nullptr;
       }
       node->initialize();
-      auto provider = node->getControllerServiceImplementation();
-      if (provider == nullptr) {
+      auto storage = node->getControllerServiceImplementation();
+      if (storage == nullptr) {
         return nullptr;
       }
-      if (!always_persist.empty() && !provider->setProperty(
-          controllers::AbstractAutoPersistingKeyValueStoreService::AlwaysPersistPropertyName, always_persist)) {
+      if (!always_persist.empty() && !storage->setProperty(controllers::ALWAYS_PERSIST_PROPERTY_NAME, always_persist)) {
         return nullptr;
       }
-      if (!auto_persistence_interval.empty() && !provider->setProperty(
-          controllers::AbstractAutoPersistingKeyValueStoreService::AutoPersistenceIntervalPropertyName, auto_persistence_interval)) {
+      if (!auto_persistence_interval.empty() && !storage->setProperty(controllers::AUTO_PERSISTENCE_INTERVAL_PROPERTY_NAME, auto_persistence_interval)) {
         return nullptr;
       }
       for (const auto& extraProperty : extraProperties) {
-        if (!provider->setProperty(extraProperty.first, extraProperty.second)) {
+        if (!storage->setProperty(extraProperty.first, extraProperty.second)) {
           return nullptr;
         }
       }
       if (!node->enable()) {
         return nullptr;
       }
-      return std::dynamic_pointer_cast<core::CoreComponentStateManagerProvider>(provider);
+      return std::dynamic_pointer_cast<core::StateStorage>(storage);
     };
 
     std::string preferredType;
-    configuration->get(minifi::Configure::nifi_state_management_provider_local_class_name, preferredType);
+    configuration->get(minifi::Configure::nifi_state_storage_local_class_name, minifi::Configure::nifi_state_storage_local_class_name_old, preferredType);
 
     /* Try to create a RocksDB-backed provider */
-    if (preferredType.empty() || preferredType == "RocksDbPersistableKeyValueStoreService") {
-      auto provider = create_provider("RocksDbPersistableKeyValueStoreService",
-                                      "org.apache.nifi.minifi.controllers.RocksDbPersistableKeyValueStoreService",
+    if (preferredType.empty() || preferredType == "RocksDbPersistableKeyValueStoreService" || preferredType == "RocksDbStateStorage") {
+      auto provider = create_provider("RocksDbStateStorage",
+                                      "org.apache.nifi.minifi.controllers.RocksDbStateStorage",
                                       {{"Directory", path.value_or("corecomponentstate")}});
       if (provider != nullptr) {
         return provider;
@@ -327,9 +319,9 @@
     }
 
     /* Fall back to a locked unordered map-backed provider */
-    if (preferredType.empty() || preferredType == "UnorderedMapPersistableKeyValueStoreService") {
-      auto provider = create_provider("UnorderedMapPersistableKeyValueStoreService",
-                                 "org.apache.nifi.minifi.controllers.UnorderedMapPersistableKeyValueStoreService",
+    if (preferredType.empty() || preferredType == "UnorderedMapPersistableKeyValueStoreService" || preferredType == "PersistentMapStateStorage") {
+      auto provider = create_provider("PersistentMapStateStorage",
+                                 "org.apache.nifi.minifi.controllers.PersistentMapStateStorage",
                                  {{"File", path.value_or("corecomponentstate.txt")}});
       if (provider != nullptr) {
         return provider;
@@ -337,9 +329,9 @@
     }
 
     /* Fall back to volatile memory-backed provider */
-    if (preferredType.empty() || preferredType == "UnorderedMapKeyValueStoreService") {
-      auto provider = create_provider("UnorderedMapKeyValueStoreService",
-                                      "org.apache.nifi.minifi.controllers.UnorderedMapKeyValueStoreService",
+    if (preferredType.empty() || preferredType == "UnorderedMapKeyValueStoreService" || preferredType == "VolatileMapStateStorage") {
+      auto provider = create_provider("VolatileMapStateStorage",
+                                      "org.apache.nifi.minifi.controllers.VolatileMapStateStorage",
                                       {});
       if (provider != nullptr) {
         return provider;
@@ -350,27 +342,27 @@
     return nullptr;
   }
 
-  static std::shared_ptr<core::CoreComponentStateManagerProvider> getStateManagerProvider(
+  static std::shared_ptr<core::StateStorage> getStateStorage(
       const std::shared_ptr<logging::Logger>& logger,
       controller::ControllerServiceProvider* const controller_service_provider,
       const std::shared_ptr<minifi::Configure>& configuration) {
     if (controller_service_provider == nullptr) {
       return nullptr;
     }
-    std::string requestedStateManagerName;
-    if (configuration != nullptr && configuration->get(minifi::Configure::nifi_state_management_provider_local, requestedStateManagerName)) {
-      auto node = controller_service_provider->getControllerServiceNode(requestedStateManagerName);
+    std::string requestedStateStorageName;
+    if (configuration != nullptr && configuration->get(minifi::Configure::nifi_state_storage_local, minifi::Configure::nifi_state_storage_local_old, requestedStateStorageName)) {
+      auto node = controller_service_provider->getControllerServiceNode(requestedStateStorageName);
       if (node == nullptr) {
-        logger->log_error("Failed to find the CoreComponentStateManagerProvider %s defined by %s", requestedStateManagerName, minifi::Configure::nifi_state_management_provider_local);
+        logger->log_error("Failed to find the StateStorage %s defined by %s", requestedStateStorageName, minifi::Configure::nifi_state_storage_local);
         return nullptr;
       }
-      return std::dynamic_pointer_cast<core::CoreComponentStateManagerProvider>(node->getControllerServiceImplementation());
+      return std::dynamic_pointer_cast<core::StateStorage>(node->getControllerServiceImplementation());
     } else {
-      auto state_manager_provider = getOrCreateDefaultStateManagerProvider(controller_service_provider, configuration);
-      if (state_manager_provider == nullptr) {
-        logger->log_error("Failed to create default CoreComponentStateManagerProvider");
+      auto state_storage = getOrCreateDefaultStateStorage(controller_service_provider, configuration);
+      if (state_storage == nullptr) {
+        logger->log_error("Failed to create default StateStorage");
       }
-      return state_manager_provider;
+      return state_storage;
     }
   }
 
@@ -381,8 +373,8 @@
   }
 
   controller::ControllerServiceProvider* controller_service_provider_;
-  std::shared_ptr<core::CoreComponentStateManagerProvider> state_manager_provider_;
-  std::unique_ptr<CoreComponentStateManager> state_manager_;
+  std::shared_ptr<core::StateStorage> state_storage_;
+  std::unique_ptr<StateManager> state_manager_;
   std::shared_ptr<core::Repository> repo_;
   std::shared_ptr<core::Repository> flow_repo_;
   std::shared_ptr<core::ContentRepository> content_repo_;
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index 208f7f1..49d8efe 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -215,7 +215,7 @@
 
   std::shared_ptr<ContentSession> content_session_;
 
-  CoreComponentStateManager* stateManager_;
+  StateManager* stateManager_;
 
   static std::shared_ptr<utils::IdGenerator> id_generator_;
 
diff --git a/libminifi/include/core/StateManager.h b/libminifi/include/core/StateManager.h
new file mode 100644
index 0000000..89ef8b2
--- /dev/null
+++ b/libminifi/include/core/StateManager.h
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "Core.h"
+
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_map>
+
+namespace org::apache::nifi::minifi::core {
+
+/**
+ * Stores state for one component.
+ * Supported operations: get(), set(), clear(), persist().
+ * Behavior can be transactional. Use beginTransaction() to enter a transaction and commit() or rollback() to conclude it.
+ */
+class StateManager {
+ public:
+  using State = std::unordered_map<std::string, std::string>;
+
+  explicit StateManager(const utils::Identifier& id)
+    : id_(id) {
+  }
+
+  virtual ~StateManager() = default;
+
+  virtual bool set(const State& kvs) = 0;
+  virtual bool get(State& kvs) = 0;
+
+  std::optional<State> get() {
+    if (State out; get(out)) {
+      return out;
+    }
+    return std::nullopt;
+  }
+
+  virtual bool clear() = 0;
+  virtual bool persist() = 0;
+
+  [[nodiscard]] virtual bool isTransactionInProgress() const = 0;
+
+  virtual bool beginTransaction() = 0;
+  virtual bool commit() = 0;
+  virtual bool rollback() = 0;
+
+ protected:
+  utils::Identifier id_;
+};
+
+}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/core/StateStorage.h b/libminifi/include/core/StateStorage.h
new file mode 100644
index 0000000..6870b47
--- /dev/null
+++ b/libminifi/include/core/StateStorage.h
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "Core.h"
+#include "StateManager.h"
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+namespace org::apache::nifi::minifi::core {
+
+/**
+ * Serves as a state storage background for the entire application, all StateManagers are created by it and use it.
+ */
+class StateStorage {
+ public:
+  virtual ~StateStorage() = default;
+
+  virtual std::unique_ptr<StateManager> getStateManager(const utils::Identifier& uuid) = 0;
+
+  std::unique_ptr<StateManager> getStateManager(const CoreComponent& component) {
+    return getStateManager(component.getUUID());
+  }
+
+  virtual std::unordered_map<utils::Identifier, StateManager::State> getAllStates() = 0;
+};
+
+}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h
index 24d0d28..eb2f598 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -121,11 +121,16 @@
   static constexpr const char *nifi_c2_mqtt_update_topic = "nifi.c2.mqtt.update.topic";
 
   // state management options
-  static constexpr const char *nifi_state_management_provider_local = "nifi.state.management.provider.local";
-  static constexpr const char *nifi_state_management_provider_local_class_name = "nifi.state.management.provider.local.class.name";
-  static constexpr const char *nifi_state_management_provider_local_always_persist = "nifi.state.management.provider.local.always.persist";
-  static constexpr const char *nifi_state_management_provider_local_auto_persistence_interval = "nifi.state.management.provider.local.auto.persistence.interval";
-  static constexpr const char *nifi_state_management_provider_local_path = "nifi.state.management.provider.local.path";
+  static constexpr const char *nifi_state_storage_local = "nifi.state.storage.local";
+  static constexpr const char *nifi_state_storage_local_old = "nifi.state.management.provider.local";
+  static constexpr const char *nifi_state_storage_local_class_name = "nifi.state.storage.local.class.name";
+  static constexpr const char *nifi_state_storage_local_class_name_old = "nifi.state.management.provider.local.class.name";
+  static constexpr const char *nifi_state_storage_local_always_persist = "nifi.state.storage.local.always.persist";
+  static constexpr const char *nifi_state_storage_local_always_persist_old = "nifi.state.management.provider.local.always.persist";
+  static constexpr const char *nifi_state_storage_local_auto_persistence_interval = "nifi.state.storage.local.auto.persistence.interval";
+  static constexpr const char *nifi_state_storage_local_auto_persistence_interval_old = "nifi.state.management.provider.local.auto.persistence.interval";
+  static constexpr const char *nifi_state_storage_local_path = "nifi.state.storage.local.path";
+  static constexpr const char *nifi_state_storage_local_path_old = "nifi.state.management.provider.local.path";
 
   // disk space watchdog options
   static constexpr const char *minifi_disk_space_watchdog_enable = "minifi.disk.space.watchdog.enable";
diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h
index 76d8b8f..46a6eff 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -28,20 +28,20 @@
 
 struct ConfigTestAccessor;
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
 class Configure : public Configuration, public core::AgentIdentificationProvider {
   friend struct ::ConfigTestAccessor;
  public:
   explicit Configure(std::optional<Decryptor> decryptor = std::nullopt, std::shared_ptr<core::logging::LoggerProperties> logger_properties = {})
-      : Configuration{}, decryptor_(std::move(decryptor)), logger_properties_(std::move(logger_properties)) {}
+      : decryptor_(std::move(decryptor))
+      , logger_properties_(std::move(logger_properties)) {
+  }
 
   bool get(const std::string& key, std::string& value) const;
   bool get(const std::string& key, const std::string& alternate_key, std::string& value) const;
   std::optional<std::string> get(const std::string& key) const;
+  std::optional<std::string> getWithFallback(const std::string& key, const std::string& alternate_key) const;
   std::optional<std::string> getRawValue(const std::string& key) const;
 
   std::optional<std::string> getAgentClass() const override;
@@ -56,7 +56,7 @@
  private:
   // WARNING! a test utility
   void setLoggerProperties(std::shared_ptr<core::logging::LoggerProperties> new_properties) {
-    logger_properties_ = new_properties;
+    logger_properties_ = std::move(new_properties);
   }
 
   bool isEncrypted(const std::string& key) const;
@@ -68,7 +68,4 @@
   std::shared_ptr<core::logging::LoggerProperties> logger_properties_;
 };
 
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/utils/ListingStateManager.h b/libminifi/include/utils/ListingStateManager.h
index d97e149..7fa9501 100644
--- a/libminifi/include/utils/ListingStateManager.h
+++ b/libminifi/include/utils/ListingStateManager.h
@@ -25,7 +25,7 @@
 #include <chrono>
 #include <utility>
 
-#include "core/CoreComponentState.h"
+#include "core/StateManager.h"
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerFactory.h"
 
@@ -49,7 +49,7 @@
 
 class ListingStateManager {
  public:
-  explicit ListingStateManager(core::CoreComponentStateManager* state_manager)
+  explicit ListingStateManager(core::StateManager* state_manager)
     : state_manager_(state_manager) {
   }
 
@@ -63,7 +63,7 @@
   [[nodiscard]] static uint64_t getLatestListedKeyTimestampInMilliseconds(const std::unordered_map<std::string, std::string> &state);
   [[nodiscard]] static std::unordered_set<std::string> getLatestListedKeys(const std::unordered_map<std::string, std::string> &state);
 
-  core::CoreComponentStateManager* state_manager_;
+  core::StateManager* state_manager_;
   std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<ListingStateManager>::getLogger()};
 };
 
diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp
index baba871..69b24e9 100644
--- a/libminifi/src/Configuration.cpp
+++ b/libminifi/src/Configuration.cpp
@@ -98,11 +98,16 @@
   core::ConfigurationProperty{Configuration::nifi_c2_mqtt_connector_service},
   core::ConfigurationProperty{Configuration::nifi_c2_mqtt_heartbeat_topic},
   core::ConfigurationProperty{Configuration::nifi_c2_mqtt_update_topic},
-  core::ConfigurationProperty{Configuration::nifi_state_management_provider_local},
-  core::ConfigurationProperty{Configuration::nifi_state_management_provider_local_class_name},
-  core::ConfigurationProperty{Configuration::nifi_state_management_provider_local_always_persist, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())},
-  core::ConfigurationProperty{Configuration::nifi_state_management_provider_local_auto_persistence_interval, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())},
-  core::ConfigurationProperty{Configuration::nifi_state_management_provider_local_path},
+  core::ConfigurationProperty{Configuration::nifi_state_storage_local},
+  core::ConfigurationProperty{Configuration::nifi_state_storage_local_old},
+  core::ConfigurationProperty{Configuration::nifi_state_storage_local_class_name},
+  core::ConfigurationProperty{Configuration::nifi_state_storage_local_class_name_old},
+  core::ConfigurationProperty{Configuration::nifi_state_storage_local_always_persist, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())},
+  core::ConfigurationProperty{Configuration::nifi_state_storage_local_always_persist_old, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())},
+  core::ConfigurationProperty{Configuration::nifi_state_storage_local_auto_persistence_interval, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())},
+  core::ConfigurationProperty{Configuration::nifi_state_storage_local_auto_persistence_interval_old, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())},
+  core::ConfigurationProperty{Configuration::nifi_state_storage_local_path},
+  core::ConfigurationProperty{Configuration::nifi_state_storage_local_path_old},
   core::ConfigurationProperty{Configuration::minifi_disk_space_watchdog_enable, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())},
   core::ConfigurationProperty{Configuration::minifi_disk_space_watchdog_interval, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())},
   core::ConfigurationProperty{Configuration::minifi_disk_space_watchdog_stop_threshold, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_LONG_VALIDATOR.get())},
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 7067c26..60f27c8 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -24,10 +24,7 @@
 #include "utils/StringUtils.h"
 #include "properties/Configuration.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
 bool Configure::get(const std::string& key, std::string& value) const {
   if (auto opt_value = getRawValue(key)) {
@@ -64,6 +61,14 @@
   return std::nullopt;
 }
 
+std::optional<std::string> Configure::getWithFallback(const std::string& key, const std::string& alternate_key) const {
+  std::string value;
+  if (get(key, alternate_key, value)) {
+    return value;
+  }
+  return std::nullopt;
+}
+
 std::optional<std::string> Configure::getRawValue(const std::string& key) const {
   static constexpr std::string_view log_prefix = "nifi.log.";
   if (utils::StringUtils::startsWith(key, log_prefix)) {
@@ -129,7 +134,4 @@
   return success;
 }
 
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index fa17168..76f5be0 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -31,7 +31,7 @@
 
 #include "c2/ControllerSocketProtocol.h"
 #include "core/ProcessContext.h"
-#include "core/CoreComponentState.h"
+#include "core/StateManager.h"
 #include "core/state/UpdateController.h"
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
@@ -450,11 +450,11 @@
     }
     case ClearOperand::CORECOMPONENTSTATE: {
       for (const auto& corecomponent : resp.operation_arguments) {
-        auto state_manager_provider = core::ProcessContext::getStateManagerProvider(logger_, controller_, configuration_);
-        if (state_manager_provider != nullptr) {
-          update_sink_->executeOnComponent(corecomponent.second.to_string(), [this, &state_manager_provider] (state::StateController& component) {
+        auto state_storage = core::ProcessContext::getStateStorage(logger_, controller_, configuration_);
+        if (state_storage != nullptr) {
+          update_sink_->executeOnComponent(corecomponent.second.to_string(), [this, &state_storage] (state::StateController& component) {
             logger_->log_debug("Clearing state for component %s", component.getComponentName());
-            auto state_manager = state_manager_provider->getCoreComponentStateManager(component.getComponentUUID());
+            auto state_manager = state_storage->getStateManager(component.getComponentUUID());
             if (state_manager != nullptr) {
               component.stop();
               state_manager->clear();
@@ -465,7 +465,7 @@
             }
           });
         } else {
-          logger_->log_error("Failed to get StateManagerProvider");
+          logger_->log_error("Failed to get StateStorage");
         }
       }
       break;
@@ -562,9 +562,9 @@
       response.setLabel("corecomponentstate");
       C2Payload states(Operation::ACKNOWLEDGE, resp.ident, true);
       states.setLabel("corecomponentstate");
-      auto state_manager_provider = core::ProcessContext::getStateManagerProvider(logger_, controller_, configuration_);
-      if (state_manager_provider != nullptr) {
-        auto core_component_states = state_manager_provider->getAllCoreComponentStates();
+      auto state_storage = core::ProcessContext::getStateStorage(logger_, controller_, configuration_);
+      if (state_storage != nullptr) {
+        auto core_component_states = state_storage->getAllStates();
         for (const auto& core_component_state : core_component_states) {
           C2Payload state(Operation::ACKNOWLEDGE, resp.ident, true);
           state.setLabel(core_component_state.first.to_string());
diff --git a/libminifi/src/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.cpp b/libminifi/src/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.cpp
deleted file mode 100644
index 4a85358..0000000
--- a/libminifi/src/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.cpp
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
-
-#include <cinttypes>
-
-using namespace std::literals::chrono_literals;
-
-namespace org::apache::nifi::minifi::controllers {
-
-AbstractAutoPersistingKeyValueStoreService::AbstractAutoPersistingKeyValueStoreService(std::string name, const utils::Identifier& uuid /*= utils::Identifier()*/)
-    : PersistableKeyValueStoreService(std::move(name), uuid)
-    , always_persist_(false)
-    , auto_persistence_interval_(0U)
-    , running_(false) {
-}
-
-AbstractAutoPersistingKeyValueStoreService::~AbstractAutoPersistingKeyValueStoreService() {
-  stopPersistingThread();
-}
-
-void AbstractAutoPersistingKeyValueStoreService::stopPersistingThread() {
-  std::unique_lock<std::mutex> lock(persisting_mutex_);
-  if (persisting_thread_.joinable()) {
-    running_ = false;
-    persisting_cv_.notify_one();
-    lock.unlock();
-    persisting_thread_.join();
-  }
-}
-
-void AbstractAutoPersistingKeyValueStoreService::onEnable() {
-  std::unique_lock<std::mutex> lock(persisting_mutex_);
-
-  if (configuration_ == nullptr) {
-    logger_->log_debug("Cannot enable AbstractAutoPersistingKeyValueStoreService");
-    return;
-  }
-
-  std::string value;
-  if (!getProperty(AlwaysPersistPropertyName, value)) {
-    logger_->log_error("Always Persist attribute is missing or invalid");
-  } else {
-    always_persist_ = utils::StringUtils::toBool(value).value_or(false);
-  }
-  core::TimePeriodValue auto_persistence_interval;
-  if (!getProperty(AutoPersistenceIntervalPropertyName, auto_persistence_interval)) {
-    logger_->log_error("Auto Persistence Interval attribute is missing or invalid");
-  } else {
-    auto_persistence_interval_ = auto_persistence_interval.getMilliseconds();
-  }
-
-  if (!always_persist_ && auto_persistence_interval_ != 0s) {
-    if (!persisting_thread_.joinable()) {
-      logger_->log_trace("Starting auto persistence thread");
-      running_ = true;
-      persisting_thread_ = std::thread(&AbstractAutoPersistingKeyValueStoreService::persistingThreadFunc, this);
-    }
-  }
-
-  logger_->log_trace("Enabled AbstractAutoPersistingKeyValueStoreService");
-}
-
-void AbstractAutoPersistingKeyValueStoreService::notifyStop() {
-  stopPersistingThread();
-}
-
-void AbstractAutoPersistingKeyValueStoreService::persistingThreadFunc() {
-  std::unique_lock<std::mutex> lock(persisting_mutex_);
-
-  while (true) {
-    logger_->log_trace("Persisting thread is going to sleep for %" PRId64 " ms", int64_t{auto_persistence_interval_.count()});
-    persisting_cv_.wait_for(lock, auto_persistence_interval_, [this] {
-      return !running_;
-    });
-
-    if (!running_) {
-      logger_->log_trace("Stopping persistence thread");
-      return;
-    }
-
-    persist();
-  }
-}
-
-}  // namespace org::apache::nifi::minifi::controllers
diff --git a/libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp b/libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp
deleted file mode 100644
index b9e4df3..0000000
--- a/libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h"
-#include "Exception.h"
-
-#include <memory>
-
-#include "rapidjson/rapidjson.h"
-#include "rapidjson/document.h"
-#include "rapidjson/stringbuffer.h"
-#include "rapidjson/writer.h"
-
-#undef GetObject  // windows.h #defines GetObject = GetObjectA or GetObjectW, which conflicts with rapidjson
-
-namespace {
-using org::apache::nifi::minifi::core::CoreComponentState;
-
-std::string serialize(const CoreComponentState &kvs) {
-  rapidjson::Document doc(rapidjson::kObjectType);
-  rapidjson::Document::AllocatorType &alloc = doc.GetAllocator();
-  for (const auto &kv : kvs) {
-    doc.AddMember(rapidjson::StringRef(kv.first.c_str(), kv.first.size()),
-                  rapidjson::StringRef(kv.second.c_str(), kv.second.size()), alloc);
-  }
-
-  rapidjson::StringBuffer buffer;
-  rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
-  doc.Accept(writer);
-
-  return buffer.GetString();
-}
-
-CoreComponentState deserialize(const std::string &serialized) {
-  rapidjson::StringStream stream(serialized.c_str());
-  rapidjson::Document doc;
-  rapidjson::ParseResult res = doc.ParseStream(stream);
-  if (!res || !doc.IsObject()) {
-    using org::apache::nifi::minifi::Exception;
-    using org::apache::nifi::minifi::FILE_OPERATION_EXCEPTION;
-    throw Exception(FILE_OPERATION_EXCEPTION, "Could not deserialize saved state, error during JSON parsing.");
-  }
-
-  CoreComponentState retState;
-  for (const auto &kv : doc.GetObject()) {
-    retState[kv.name.GetString()] = kv.value.GetString();
-  }
-
-  return retState;
-}
-}  // anonymous namespace
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace controllers {
-
-AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::AbstractCoreComponentStateManager(
-    AbstractCoreComponentStateManagerProvider* provider,
-    const utils::Identifier& id)
-    : provider_(provider)
-    , id_(id)
-    , state_valid_(false)
-    , transaction_in_progress_(false)
-    , change_type_(ChangeType::NONE) {
-  std::string serialized;
-  if (provider_->getImpl(id_, serialized)) {
-    state_ = deserialize(serialized);
-    state_valid_ = true;
-  }
-}
-
-bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::set(const core::CoreComponentState& kvs) {
-  bool autoCommit = false;
-  if (!transaction_in_progress_) {
-    autoCommit = true;
-    transaction_in_progress_ = true;
-  }
-
-  change_type_ = ChangeType::SET;
-  state_to_set_ = kvs;
-
-  if (autoCommit) {
-    return commit();
-  }
-  return true;
-}
-
-bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::get(core::CoreComponentState& kvs) {
-  if (!state_valid_) {
-    return false;
-  }
-  // not allowed, if there were modifications (dirty read)
-  if (change_type_ != ChangeType::NONE) {
-    return false;
-  }
-  kvs = state_;
-  return true;
-}
-
-bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::clear() {
-  if (!state_valid_) {
-    return false;
-  }
-
-  bool autoCommit = false;
-  if (!transaction_in_progress_) {
-    autoCommit = true;
-    transaction_in_progress_ = true;
-  }
-
-  change_type_ = ChangeType::CLEAR;
-  state_to_set_.clear();
-
-  if (autoCommit) {
-    return commit();
-  }
-  return true;
-}
-
-bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::persist() {
-  return provider_->persistImpl();
-}
-
-bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::isTransactionInProgress() const {
-  return transaction_in_progress_;
-}
-
-bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::beginTransaction() {
-  if (transaction_in_progress_) {
-    return false;
-  }
-  transaction_in_progress_ = true;
-  return true;
-}
-
-bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::commit() {
-  if (!transaction_in_progress_) {
-    return false;
-  }
-
-  bool success = true;
-
-  // actually make the pending changes
-  if (change_type_ == ChangeType::SET) {
-    if (provider_->setImpl(id_, serialize(state_to_set_))) {
-      state_valid_ = true;
-      state_ = state_to_set_;
-    } else {
-      success = false;
-    }
-  } else if (change_type_ == ChangeType::CLEAR) {
-    if (!state_valid_) {  // NOLINT(bugprone-branch-clone)
-      success = false;
-    } else if (provider_->removeImpl(id_)) {
-      state_valid_ = false;
-      state_.clear();
-    } else {
-      success = false;
-    }
-  }
-
-  if (success && change_type_ != ChangeType::NONE) {
-    success = persist();
-  }
-
-  change_type_ = ChangeType::NONE;
-  state_to_set_.clear();
-  transaction_in_progress_ = false;
-  return success;
-}
-
-bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::rollback() {
-  if (!transaction_in_progress_) {
-    return false;
-  }
-
-  change_type_ = ChangeType::NONE;
-  state_to_set_.clear();
-  transaction_in_progress_ = false;
-  return true;
-}
-
-std::unique_ptr<core::CoreComponentStateManager> AbstractCoreComponentStateManagerProvider::getCoreComponentStateManager(const utils::Identifier& uuid) {
-  std::lock_guard<std::mutex> guard(mutex_);
-  return std::make_unique<AbstractCoreComponentStateManager>(this, uuid);
-}
-
-std::map<utils::Identifier, core::CoreComponentState> AbstractCoreComponentStateManagerProvider::getAllCoreComponentStates() {
-  std::map<utils::Identifier, std::string> all_serialized;
-  if (!getImpl(all_serialized)) {
-    return {};
-  }
-
-  std::map<utils::Identifier, core::CoreComponentState> all_deserialized;
-  for (const auto& serialized : all_serialized) {
-    all_deserialized.emplace(serialized.first, deserialize(serialized.second));
-  }
-
-  return all_deserialized;
-}
-
-}  // namespace controllers
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
diff --git a/libminifi/src/controllers/keyvalue/AutoPersistor.cpp b/libminifi/src/controllers/keyvalue/AutoPersistor.cpp
new file mode 100644
index 0000000..d5dcded
--- /dev/null
+++ b/libminifi/src/controllers/keyvalue/AutoPersistor.cpp
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cinttypes>
+
+#include "controllers/keyvalue/AutoPersistor.h"
+#include "core/PropertyBuilder.h"
+#include "core/TypedValues.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::controllers {
+
+AutoPersistor::~AutoPersistor() {
+  stop();
+}
+
+void AutoPersistor::stop() {
+  std::unique_lock<std::mutex> lock(persisting_mutex_);
+  if (persisting_thread_.joinable()) {
+    running_ = false;
+    lock.unlock();
+    persisting_cv_.notify_one();
+    persisting_thread_.join();
+  }
+}
+
+void AutoPersistor::start(bool always_persist, std::chrono::milliseconds auto_persistence_interval, std::function<bool()> persist) {
+  std::unique_lock<std::mutex> lock(persisting_mutex_);
+
+  always_persist_ = always_persist;
+  auto_persistence_interval_ = auto_persistence_interval;
+  persist_ = std::move(persist);
+
+  if (!always_persist_ && auto_persistence_interval_ != 0s) {
+    if (!persisting_thread_.joinable()) {
+      logger_->log_trace("Starting auto persistence thread");
+      running_ = true;
+      persisting_thread_ = std::thread(&AutoPersistor::persistingThreadFunc, this);
+    }
+  }
+
+  logger_->log_trace("Enabled AutoPersistor");
+}
+
+void AutoPersistor::persistingThreadFunc() {
+  std::unique_lock<std::mutex> lock(persisting_mutex_);
+
+  while (true) {
+    logger_->log_trace("Persisting thread is going to sleep for %" PRId64 " ms", int64_t{auto_persistence_interval_.count()});
+    persisting_cv_.wait_for(lock, auto_persistence_interval_, [this] {
+      return !running_;
+    });
+
+    if (!running_) {
+      logger_->log_trace("Stopping persistence thread");
+      return;
+    }
+
+    if (!persist_) {
+      logger_->log_error("Persist function is empty");
+    } else if (!persist_()) {
+      logger_->log_error("Persisting failed");
+    }
+  }
+}
+
+}  // namespace org::apache::nifi::minifi::controllers
diff --git a/libminifi/src/controllers/keyvalue/KeyValueStateManager.cpp b/libminifi/src/controllers/keyvalue/KeyValueStateManager.cpp
new file mode 100644
index 0000000..bf2f1f3
--- /dev/null
+++ b/libminifi/src/controllers/keyvalue/KeyValueStateManager.cpp
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+
+#include "controllers/keyvalue/KeyValueStateManager.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
+
+namespace org::apache::nifi::minifi::controllers {
+
+KeyValueStateManager::KeyValueStateManager(
+        const utils::Identifier& id,
+        gsl::not_null<KeyValueStateStorage*> storage)
+    : StateManager(id),
+      storage_(storage),
+      transaction_in_progress_(false),
+      change_type_(ChangeType::NONE) {
+  std::string serialized;
+  if (storage_->get(id_.to_string(), serialized)) {
+    state_ = KeyValueStateStorage::deserialize(serialized);
+  }
+}
+
+bool KeyValueStateManager::set(const core::StateManager::State& kvs) {
+  bool auto_commit = false;
+  if (!transaction_in_progress_) {
+    auto_commit = true;
+    transaction_in_progress_ = true;
+  }
+
+  change_type_ = ChangeType::SET;
+  state_to_set_ = kvs;
+
+  if (auto_commit) {
+    return commit();
+  }
+  return true;
+}
+
+bool KeyValueStateManager::get(core::StateManager::State& kvs) {
+  if (!state_) {
+    return false;
+  }
+  // not allowed, if there were modifications (dirty read)
+  if (change_type_ != ChangeType::NONE) {
+    return false;
+  }
+  kvs = *state_;
+  return true;
+}
+
+bool KeyValueStateManager::clear() {
+  if (!state_) {
+    return false;
+  }
+
+  bool auto_commit = false;
+  if (!transaction_in_progress_) {
+    auto_commit = true;
+    transaction_in_progress_ = true;
+  }
+
+  change_type_ = ChangeType::CLEAR;
+  state_to_set_.clear();
+
+  if (auto_commit) {
+    return commit();
+  }
+  return true;
+}
+
+bool KeyValueStateManager::persist() {
+  return storage_->persist();
+}
+
+bool KeyValueStateManager::isTransactionInProgress() const {
+  return transaction_in_progress_;
+}
+
+bool KeyValueStateManager::beginTransaction() {
+  if (transaction_in_progress_) {
+    return false;
+  }
+  transaction_in_progress_ = true;
+  return true;
+}
+
+bool KeyValueStateManager::commit() {
+  if (!transaction_in_progress_) {
+    return false;
+  }
+
+  bool success = true;
+
+  // actually make the pending changes
+  if (change_type_ == ChangeType::SET) {
+    if (storage_->set(id_.to_string(), KeyValueStateStorage::serialize(state_to_set_))) {
+      state_ = state_to_set_;
+    } else {
+      success = false;
+    }
+  } else if (change_type_ == ChangeType::CLEAR) {
+    if (state_ && storage_->remove(id_.to_string())) {
+      state_.reset();
+    } else {
+      success = false;
+    }
+  }
+
+  if (success && change_type_ != ChangeType::NONE) {
+    success = persist();
+  }
+
+  change_type_ = ChangeType::NONE;
+  state_to_set_.clear();
+  transaction_in_progress_ = false;
+  return success;
+}
+
+bool KeyValueStateManager::rollback() {
+  if (!transaction_in_progress_) {
+    return false;
+  }
+
+  change_type_ = ChangeType::NONE;
+  state_to_set_.clear();
+  transaction_in_progress_ = false;
+  return true;
+}
+
+}  // namespace org::apache::nifi::minifi::controllers
diff --git a/libminifi/src/controllers/keyvalue/KeyValueStateStorage.cpp b/libminifi/src/controllers/keyvalue/KeyValueStateStorage.cpp
new file mode 100644
index 0000000..e45fa21
--- /dev/null
+++ b/libminifi/src/controllers/keyvalue/KeyValueStateStorage.cpp
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+
+#include "Exception.h"
+#include "controllers/keyvalue/KeyValueStateManager.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+#include "utils/gsl.h"
+
+#undef GetObject  // windows.h #defines GetObject = GetObjectA or GetObjectW, which conflicts with rapidjson
+
+namespace org::apache::nifi::minifi::controllers {
+
+std::string KeyValueStateStorage::serialize(const core::StateManager::State& kvs) {
+  rapidjson::Document doc(rapidjson::kObjectType);
+  rapidjson::Document::AllocatorType &alloc = doc.GetAllocator();
+  for (const auto &kv : kvs) {
+    doc.AddMember(rapidjson::StringRef(kv.first.c_str(), kv.first.size()),
+                  rapidjson::StringRef(kv.second.c_str(), kv.second.size()), alloc);
+  }
+
+  rapidjson::StringBuffer buffer;
+  rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+  doc.Accept(writer);
+
+  return buffer.GetString();
+}
+
+core::StateManager::State KeyValueStateStorage::deserialize(const std::string& serialized) {
+  rapidjson::Document doc;
+  rapidjson::ParseResult res = doc.Parse(serialized.c_str(), serialized.length());
+  if (!res || !doc.IsObject()) {
+    using org::apache::nifi::minifi::Exception;
+    using org::apache::nifi::minifi::FILE_OPERATION_EXCEPTION;
+    throw Exception(FILE_OPERATION_EXCEPTION, "Could not deserialize saved state, error during JSON parsing.");
+  }
+
+  core::StateManager::State retState;
+  for (const auto &kv : doc.GetObject()) {
+    retState[kv.name.GetString()] = kv.value.GetString();
+  }
+
+  return retState;
+}
+
+KeyValueStateStorage::KeyValueStateStorage(const std::string& name, const utils::Identifier& uuid)
+  : ControllerService(name, uuid) {
+}
+
+std::unique_ptr<core::StateManager> KeyValueStateStorage::getStateManager(const utils::Identifier& uuid) {
+  return std::make_unique<KeyValueStateManager>(uuid, gsl::make_not_null(this));
+}
+
+std::unordered_map<utils::Identifier, core::StateManager::State> KeyValueStateStorage::getAllStates() {
+  std::unordered_map<utils::Identifier, std::string> all_serialized;
+  if (!getAll(all_serialized)) {
+    return {};
+  }
+
+  std::unordered_map<utils::Identifier, core::StateManager::State> all_deserialized;
+  for (const auto& serialized : all_serialized) {
+    all_deserialized.emplace(serialized.first, deserialize(serialized.second));
+  }
+
+  return all_deserialized;
+}
+
+bool KeyValueStateStorage::getAll(std::unordered_map<utils::Identifier, std::string>& kvs) {
+  std::unordered_map<std::string, std::string> states;
+  if (!get(states)) {
+    return false;
+  }
+  kvs.clear();
+  for (const auto& state : states) {
+    const auto optional_uuid = utils::Identifier::parse(state.first);
+    if (optional_uuid) {
+      kvs[optional_uuid.value()] = state.second;
+    } else {
+      logger_->log_error("Found non-UUID key \"%s\" in storage implementation", state.first);
+    }
+  }
+  return true;
+}
+
+}  // namespace org::apache::nifi::minifi::controllers
diff --git a/libminifi/src/controllers/keyvalue/KeyValueStoreService.cpp b/libminifi/src/controllers/keyvalue/KeyValueStoreService.cpp
deleted file mode 100644
index 717ddbe..0000000
--- a/libminifi/src/controllers/keyvalue/KeyValueStoreService.cpp
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "controllers/keyvalue/KeyValueStoreService.h"
-
-namespace org::apache::nifi::minifi::controllers {
-
-KeyValueStoreService::KeyValueStoreService(std::string name, const utils::Identifier& uuid /*= utils::Identifier()*/)
-    : ControllerService(std::move(name), uuid) {
-}
-
-KeyValueStoreService::~KeyValueStoreService() = default;
-
-void KeyValueStoreService::yield() {
-}
-
-bool KeyValueStoreService::isRunning() {
-  return getState() == core::controller::ControllerServiceState::ENABLED;
-}
-
-bool KeyValueStoreService::isWorkAvailable() {
-  return false;
-}
-
-}  // namespace org::apache::nifi::minifi::controllers
diff --git a/libminifi/src/controllers/keyvalue/PersistableKeyValueStoreService.cpp b/libminifi/src/controllers/keyvalue/PersistableKeyValueStoreService.cpp
deleted file mode 100644
index 7a17c01..0000000
--- a/libminifi/src/controllers/keyvalue/PersistableKeyValueStoreService.cpp
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "controllers/keyvalue/PersistableKeyValueStoreService.h"
-#include "core/logging/LoggerConfiguration.h"
-
-namespace org::apache::nifi::minifi::controllers {
-
-PersistableKeyValueStoreService::PersistableKeyValueStoreService(std::string name, const utils::Identifier& uuid /*= utils::Identifier()*/)
-    : KeyValueStoreService(std::move(name), uuid) {
-}
-
-PersistableKeyValueStoreService::~PersistableKeyValueStoreService() = default;
-
-bool PersistableKeyValueStoreService::setImpl(const utils::Identifier& key, const std::string& serialized_state) {
-  return set(key.to_string(), serialized_state);
-}
-
-bool PersistableKeyValueStoreService::getImpl(const utils::Identifier& key, std::string& serialized_state) {
-  return get(key.to_string(), serialized_state);
-}
-
-bool PersistableKeyValueStoreService::getImpl(std::map<utils::Identifier, std::string>& kvs) {
-  std::unordered_map<std::string, std::string> states;
-  if (!get(states)) {
-    return false;
-  }
-  kvs.clear();
-  for (const auto& state : states) {
-    const auto optional_uuid = utils::Identifier::parse(state.first);
-    if (optional_uuid) {
-      kvs[optional_uuid.value()] = state.second;
-    } else {
-      core::logging::LoggerFactory<PersistableKeyValueStoreService>::getLogger()
-          ->log_error("Found non-UUID key \"%s\" in storage implementation", state.first);
-    }
-  }
-  return true;
-}
-
-bool PersistableKeyValueStoreService::removeImpl(const utils::Identifier& key) {
-  return remove(key.to_string());
-}
-
-bool PersistableKeyValueStoreService::persistImpl() {
-  return persist();
-}
-
-}  // namespace org::apache::nifi::minifi::controllers
diff --git a/libminifi/test/StatefulProcessor.h b/libminifi/test/StatefulProcessor.h
index 2afefcd..0fcfe18 100644
--- a/libminifi/test/StatefulProcessor.h
+++ b/libminifi/test/StatefulProcessor.h
@@ -21,7 +21,7 @@
 #include <utility>
 #include <vector>
 #include "core/Processor.h"
-#include "core/CoreComponentState.h"
+#include "core/StateManager.h"
 
 namespace org::apache::nifi::minifi::processors {
 
@@ -41,13 +41,13 @@
   void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) override;
   void onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>&) override;
 
-  using HookType = std::function<void(core::CoreComponentStateManager&)>;
+  using HookType = std::function<void(core::StateManager&)>;
   void setHooks(HookType onScheduleHook, std::vector<HookType> onTriggerHooks);
   [[nodiscard]] bool hasFinishedHooks() const;
 
  private:
   mutable std::mutex mutex_;
-  core::CoreComponentStateManager* state_manager_;
+  core::StateManager* state_manager_;
   HookType on_schedule_hook_;
   std::vector<HookType> on_trigger_hooks_;
   size_t on_trigger_hook_index_ = 0;
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 11246b7..b5c48dc 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -198,16 +198,16 @@
   stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
   controller_services_ = std::make_shared<minifi::core::controller::ControllerServiceMap>();
   controller_services_provider_ = std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration_);
-  /* Inject the default state provider ahead of ProcessContext to make sure we have a unique state directory */
+  /* Inject the default state storage ahead of ProcessContext to make sure we have a unique state directory */
   if (state_dir == nullptr) {
     state_dir_ = std::make_unique<TempDirectory>();
   } else {
     state_dir_ = std::make_unique<TempDirectory>(state_dir);
   }
-  if (!configuration_->get(minifi::Configure::nifi_state_management_provider_local_path)) {
-    configuration_->set(minifi::Configure::nifi_state_management_provider_local_path, state_dir_->getPath().string());
+  if (!configuration_->get(minifi::Configure::nifi_state_storage_local_path)) {
+    configuration_->set(minifi::Configure::nifi_state_storage_local_path, state_dir_->getPath().string());
   }
-  state_manager_provider_ = minifi::core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_services_provider_.get(), configuration_);
+  state_storage_ = minifi::core::ProcessContext::getOrCreateDefaultStateStorage(controller_services_provider_.get(), configuration_);
 }
 
 TestPlan::~TestPlan() {
@@ -621,7 +621,7 @@
 std::shared_ptr<TestPlan> TestController::createPlan(PlanConfig config) {
   if (!config.configuration) {
     config.configuration = std::make_shared<minifi::Configure>();
-    config.configuration->set(minifi::Configure::nifi_state_management_provider_local_class_name, "UnorderedMapKeyValueStoreService");
+    config.configuration->set(minifi::Configure::nifi_state_storage_local_class_name, "VolatileMapStateStorage");
     config.configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, createTempDirectory().string());
   }
 
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 58530a7..f698930 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -47,7 +47,7 @@
 namespace org::apache::nifi::minifi {
 class Connection;
 namespace core {
-class CoreComponentStateManagerProvider;
+class StateStorage;
 class ContentRepository;
 class FlowFile;
 class Processor;
@@ -275,8 +275,8 @@
     return state_dir_->getPath();
   }
 
-  [[nodiscard]] std::shared_ptr<core::CoreComponentStateManagerProvider> getStateManagerProvider() const {
-    return state_manager_provider_;
+  [[nodiscard]] std::shared_ptr<core::StateStorage> getStateStorage() const {
+    return state_storage_;
   }
 
   std::string getContent(const std::shared_ptr<const minifi::core::FlowFile>& file) const { return getContent(*file); }
@@ -303,7 +303,7 @@
   std::shared_ptr<minifi::core::controller::ControllerServiceMap> controller_services_;
   std::shared_ptr<minifi::core::controller::ControllerServiceProvider> controller_services_provider_;
 
-  std::shared_ptr<minifi::core::CoreComponentStateManagerProvider> state_manager_provider_;
+  std::shared_ptr<minifi::core::StateStorage> state_storage_;
 
   std::recursive_mutex mutex;
 
diff --git a/libminifi/test/flow-tests/CycleTest.cpp b/libminifi/test/flow-tests/CycleTest.cpp
index 889fd01..1be43c2 100644
--- a/libminifi/test/flow-tests/CycleTest.cpp
+++ b/libminifi/test/flow-tests/CycleTest.cpp
@@ -96,9 +96,9 @@
 Remote Processing Groups:
 
 Controller Services:
-  - name: defaultstatemanagerprovider
+  - name: defaultstatestorage
     id: 2438e3c8-015a-1000-79ca-83af40ec1994
-    class: UnorderedMapPersistableKeyValueStoreService
+    class: PersistentMapStateStorage
     Properties:
       Auto Persistence Interval:
           - value: 0 sec
diff --git a/libminifi/test/flow-tests/FlowControllerTests.cpp b/libminifi/test/flow-tests/FlowControllerTests.cpp
index 464b12d..517c863 100644
--- a/libminifi/test/flow-tests/FlowControllerTests.cpp
+++ b/libminifi/test/flow-tests/FlowControllerTests.cpp
@@ -83,9 +83,9 @@
 Remote Processing Groups:
 
 Controller Services:
-  - name: defaultstatemanagerprovider
+  - name: defaultstatestorage
     id: 2438e3c8-015a-1000-79ca-83af40ec1995
-    class: UnorderedMapPersistableKeyValueStoreService
+    class: PersistentMapStateStorage
     Properties:
       Auto Persistence Interval:
           - value: 0 sec
diff --git a/libminifi/test/flow-tests/LoopTest.cpp b/libminifi/test/flow-tests/LoopTest.cpp
index aa7ca2c..0d2fd0b 100644
--- a/libminifi/test/flow-tests/LoopTest.cpp
+++ b/libminifi/test/flow-tests/LoopTest.cpp
@@ -74,9 +74,9 @@
 Remote Processing Groups:
 
 Controller Services:
-  - name: defaultstatemanagerprovider
+  - name: defaultstatestorage
     id: 2438e3c8-015a-1000-79ca-83af40ec1996
-    class: UnorderedMapPersistableKeyValueStoreService
+    class: PersistentMapStateStorage
     Properties:
       Auto Persistence Interval:
           - value: 0 sec
diff --git a/libminifi/test/flow-tests/MultiLoopTest.cpp b/libminifi/test/flow-tests/MultiLoopTest.cpp
index e30b077..a64dabb 100644
--- a/libminifi/test/flow-tests/MultiLoopTest.cpp
+++ b/libminifi/test/flow-tests/MultiLoopTest.cpp
@@ -83,9 +83,9 @@
 Remote Processing Groups:
 
 Controller Services:
-  - name: defaultstatemanagerprovider
+  - name: defaultstatestorage
     id: 2438e3c8-015a-1000-79ca-83af40ec1997
-    class: UnorderedMapPersistableKeyValueStoreService
+    class: PersistentMapStateStorage
     Properties:
       Auto Persistence Interval:
           - value: 0 sec
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index 2b67454..8fd5a75 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -146,7 +146,7 @@
   if (test_file_location) {
     configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location->string());
   }
-  configuration->set(minifi::Configure::nifi_state_management_provider_local_class_name, "UnorderedMapKeyValueStoreService");
+  configuration->set(minifi::Configure::nifi_state_storage_local_class_name, "VolatileMapStateStorage");
 
   configureC2();
   configureFullHeartbeat();
@@ -179,10 +179,10 @@
     auto controller_service_provider = flow_config->getControllerServiceProvider();
     char state_dir_name_template[] = "/var/tmp/integrationstate.XXXXXX";
     state_dir = utils::file::create_temp_directory(state_dir_name_template);
-    if (!configuration->get(minifi::Configure::nifi_state_management_provider_local_path)) {
-      configuration->set(minifi::Configure::nifi_state_management_provider_local_path, state_dir.string());
+    if (!configuration->get(minifi::Configure::nifi_state_storage_local_path)) {
+      configuration->set(minifi::Configure::nifi_state_storage_local_path, state_dir.string());
     }
-    core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_service_provider.get(), configuration);
+    core::ProcessContext::getOrCreateDefaultStateStorage(controller_service_provider.get(), configuration);
 
     std::shared_ptr<core::ProcessGroup> pg(flow_config->getRoot());
     queryRootProcessGroup(pg);
diff --git a/libminifi/test/integration/StateTransactionalityTests.cpp b/libminifi/test/integration/StateTransactionalityTests.cpp
index 299cf4c..97602a0 100644
--- a/libminifi/test/integration/StateTransactionalityTests.cpp
+++ b/libminifi/test/integration/StateTransactionalityTests.cpp
@@ -131,10 +131,10 @@
   {"State_is_recorded_after_committing", {
     {},
     {
-      [] (core::CoreComponentStateManager& stateManager) {
+      [] (core::StateManager& stateManager) {
         assert(stateManager.set(exampleState));
       },
-      [] (core::CoreComponentStateManager& stateManager) {
+      [] (core::StateManager& stateManager) {
         std::unordered_map<std::string, std::string> state;
         assert(stateManager.get(state));
         assert(state == exampleState);
@@ -145,14 +145,14 @@
   {"State_is_discarded_after_rolling_back", {
     {},
     {
-      [](core::CoreComponentStateManager& stateManager) {
+      [](core::StateManager& stateManager) {
         assert(stateManager.set(exampleState));
       },
-      [](core::CoreComponentStateManager& stateManager) {
+      [](core::StateManager& stateManager) {
         assert(stateManager.set(exampleState2));
         throw std::runtime_error("Triggering rollback");
       },
-      [](core::CoreComponentStateManager& stateManager) {
+      [](core::StateManager& stateManager) {
         std::unordered_map<std::string, std::string> state;
         assert(stateManager.get(state));
         assert(state == exampleState);
@@ -162,7 +162,7 @@
   }},
   {
     "Get_in_onSchedule_without_previous_state", {
-    [](core::CoreComponentStateManager& stateManager) {
+    [](core::StateManager& stateManager) {
       std::unordered_map<std::string, std::string> state;
       assert(!stateManager.get(state));
       assert(state.empty());
@@ -173,11 +173,11 @@
   },
   {
     "Set_in_onSchedule", {
-      [](core::CoreComponentStateManager& stateManager) {
+      [](core::StateManager& stateManager) {
         assert(stateManager.set(exampleState));
       },
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           std::unordered_map<std::string, std::string> state;
           assert(stateManager.get(state));
           assert(state == exampleState);
@@ -188,13 +188,13 @@
   },
   {
     "Clear_in_onSchedule", {
-      [](core::CoreComponentStateManager& stateManager) {
+      [](core::StateManager& stateManager) {
         assert(!stateManager.clear());
         assert(stateManager.set(exampleState));
         assert(stateManager.clear());
       },
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           std::unordered_map<std::string, std::string> state;
           assert(!stateManager.get(state));
           assert(state.empty());
@@ -206,7 +206,7 @@
   {
     "Persist_in_onSchedule", {
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.persist());
         }
       },
@@ -218,7 +218,7 @@
     "Manual_beginTransaction", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(!stateManager.beginTransaction());
         }
       },
@@ -229,7 +229,7 @@
     "Manual_commit", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.set(exampleState));
           assert(stateManager.commit());
         }
@@ -241,7 +241,7 @@
     "Manual_rollback", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.rollback());
         }
       },
@@ -252,7 +252,7 @@
     "Get_without_previous_state", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           std::unordered_map<std::string, std::string> state;
           assert(!stateManager.get(state));
           assert(state.empty());
@@ -265,10 +265,10 @@
     "(set),(get,get)", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.set(exampleState));
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           std::unordered_map<std::string, std::string> state;
           assert(stateManager.get(state));
           assert(state == exampleState);
@@ -283,10 +283,10 @@
     "(set),(get,set)", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.set(exampleState));
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           std::unordered_map<std::string, std::string> state;
           assert(stateManager.get(state));
           assert(state == exampleState);
@@ -300,10 +300,10 @@
     "(set),(get,clear)", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.set(exampleState));
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           std::unordered_map<std::string, std::string> state;
           assert(stateManager.get(state));
           assert(state == exampleState);
@@ -317,10 +317,10 @@
     "(set),(get,persist)", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.set(exampleState));
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           std::unordered_map<std::string, std::string> state;
           assert(stateManager.get(state));
           assert(state == exampleState);
@@ -334,7 +334,7 @@
     "(set,!get)", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.set(exampleState));
           std::unordered_map<std::string, std::string> state;
           assert(!stateManager.get(state));
@@ -348,7 +348,7 @@
     "(set,set)", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.set(exampleState));
           assert(stateManager.set(exampleState));
         },
@@ -360,7 +360,7 @@
     "(set,!clear)", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.set(exampleState));
           assert(!stateManager.clear());
         },
@@ -372,7 +372,7 @@
     "(set,persist)", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.set(exampleState));
           assert(stateManager.persist());
         },
@@ -384,10 +384,10 @@
     "(set),(clear,!get)", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.set(exampleState));
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.clear());
           std::unordered_map<std::string, std::string> state;
           assert(!stateManager.get(state));
@@ -401,21 +401,21 @@
     "(set),(clear),(!get),(set),(get)", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.set(exampleState));
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.clear());
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           std::unordered_map<std::string, std::string> state;
           assert(!stateManager.get(state));
           assert(state.empty());
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.set(exampleState));
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           std::unordered_map<std::string, std::string> state;
           assert(stateManager.get(state));
           assert(state == exampleState);
@@ -428,10 +428,10 @@
     "(set),(clear,set)", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.set(exampleState));
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.clear());
           assert(stateManager.set(exampleState));
         },
@@ -443,13 +443,13 @@
     "(set),(clear),(!clear)", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.set(exampleState));
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.clear());
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(!stateManager.clear());
         },
       },
@@ -460,13 +460,13 @@
     "(set),(clear),(persist)", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.set(exampleState));
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.clear());
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.persist());
         },
       },
@@ -477,13 +477,13 @@
     "(persist),(set),(get)", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.persist());
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.set(exampleState));
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           std::unordered_map<std::string, std::string> state;
           assert(stateManager.get(state));
           assert(state == exampleState);
@@ -496,13 +496,13 @@
     "(persist),(set),(clear)", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.persist());
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.set(exampleState));
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.clear());
         },
       },
@@ -513,10 +513,10 @@
     "(persist),(persist)", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.persist());
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.persist());
         },
       },
@@ -527,8 +527,8 @@
     "No_change_2_rounds", {
       {},
       {
-        [](core::CoreComponentStateManager&) {},
-        [](core::CoreComponentStateManager&) {},
+        [](core::StateManager&) {},
+        [](core::StateManager&) {},
       },
       standardLogChecker
     },
@@ -537,7 +537,7 @@
     "(!clear)", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(!stateManager.clear());
         },
       },
@@ -548,10 +548,10 @@
     "(set),(get,throw)", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.set(exampleState));
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           std::unordered_map<std::string, std::string> state;
           assert(stateManager.get(state));
           assert(state == exampleState);
@@ -565,14 +565,14 @@
     "(set),(clear,throw),(get)", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.set(exampleState));
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.clear());
           throw std::runtime_error("Triggering rollback");
         },
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           std::unordered_map<std::string, std::string> state;
           assert(stateManager.get(state));
           assert(state == exampleState);
@@ -585,7 +585,7 @@
     "(set),(clear,throw),(get)", {
       {},
       {
-        [](core::CoreComponentStateManager& stateManager) {
+        [](core::StateManager& stateManager) {
           assert(stateManager.persist());
           throw std::runtime_error("Triggering rollback");
         },
diff --git a/libminifi/test/keyvalue-tests/CMakeLists.txt b/libminifi/test/keyvalue-tests/CMakeLists.txt
index 83961cb..b8c42bb 100644
--- a/libminifi/test/keyvalue-tests/CMakeLists.txt
+++ b/libminifi/test/keyvalue-tests/CMakeLists.txt
@@ -36,8 +36,8 @@
 
 message("-- Finished building ${KEYVALUE_INT_TEST_COUNT} keyvalue controller test file(s)...")
 
-add_test(NAME UnorderedMapPersistableKeyValueStoreServiceTest COMMAND PersistableKeyValueStoreServiceTest --config-yaml "${TEST_RESOURCES}/UnorderedMapPersistableKeyValueStoreServiceTest.yml")
+add_test(NAME PersistentMapStateStorageTest COMMAND PersistentStateStorageTest --config-yaml "${TEST_RESOURCES}/PersistentMapStateStorage.yml")
 if (NOT DISABLE_ROCKSDB)
-    add_test(NAME RocksdDbPersistableKeyValueStoreServiceTest COMMAND PersistableKeyValueStoreServiceTest --config-yaml "${TEST_RESOURCES}/RocksDbPersistableKeyValueStoreServiceTest.yml")
+    add_test(NAME RocksDbStateStorageTest COMMAND PersistentStateStorageTest --config-yaml "${TEST_RESOURCES}/RocksDbStateStorage.yml")
 endif()
-add_test(NAME UnorderedMapKeyValueStoreServiceTest COMMAND UnorderedMapKeyValueStoreServiceTest --config-yaml "${TEST_RESOURCES}/UnorderedMapKeyValueStoreServiceTest.yml")
+add_test(NAME VolatileMapStateStorageTest COMMAND VolatileMapStateStorageTest --config-yaml "${TEST_RESOURCES}/VolatileMapStateStorage.yml")
diff --git a/libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp b/libminifi/test/keyvalue-tests/PersistentStateStorageTest.cpp
similarity index 80%
rename from libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp
rename to libminifi/test/keyvalue-tests/PersistentStateStorageTest.cpp
index 33e02d9..6c945ef 100644
--- a/libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp
+++ b/libminifi/test/keyvalue-tests/PersistentStateStorageTest.cpp
@@ -28,8 +28,8 @@
 #include "core/controller/ControllerService.h"
 #include "core/ProcessGroup.h"
 #include "core/yaml/YamlConfiguration.h"
-#include "controllers/keyvalue/PersistableKeyValueStoreService.h"
-#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
+#include "controllers/keyvalue/AutoPersistor.h"
 #include "unit/ProvenanceTestHelper.h"
 #include "repository/VolatileContentRepository.h"
 
@@ -41,7 +41,7 @@
   auto cli = session.cli()
       | Catch::clara::Opt{config_yaml, "config-yaml"}
           ["--config-yaml"]
-          ("path to the config.yaml containing the PersistableKeyValueStoreService controller service configuration");
+          ("path to the config.yaml containing the StateStorage controller service configuration");
   session.cli(cli);
 
   int ret = session.applyCommandLine(argc, argv);
@@ -50,25 +50,25 @@
   }
 
   if (config_yaml.empty()) {
-    std::cerr << "Missing --config-yaml <path>. It must contain the path to the config.yaml containing the PersistableKeyValueStoreService controller service configuration." << std::endl;
+    std::cerr << "Missing --config-yaml <path>. It must contain the path to the config.yaml containing the StateStorage controller service configuration." << std::endl;
     return -1;
   }
 
   return session.run();
 }
 
-class PersistableKeyValueStoreServiceTestsFixture {
+class PersistentStateStorageTestsFixture {
  public:
-  PersistableKeyValueStoreServiceTestsFixture() {
+  PersistentStateStorageTestsFixture() {
     LogTestController::getInstance().setTrace<TestPlan>();
-    LogTestController::getInstance().setTrace<minifi::controllers::PersistableKeyValueStoreService>();
-    LogTestController::getInstance().setTrace<minifi::controllers::AbstractAutoPersistingKeyValueStoreService>();
+    LogTestController::getInstance().setTrace<minifi::controllers::KeyValueStateStorage>();
+    LogTestController::getInstance().setTrace<minifi::controllers::AutoPersistor>();
 
     std::filesystem::current_path(testController.createTempDirectory());
     loadYaml();
   }
 
-  virtual ~PersistableKeyValueStoreServiceTestsFixture() {
+  virtual ~PersistentStateStorageTestsFixture() {
     LogTestController::getInstance().reset();
   }
 
@@ -102,7 +102,7 @@
     REQUIRE(persistable_key_value_store_service_node != nullptr);
     persistable_key_value_store_service_node->enable();
 
-    controller = std::dynamic_pointer_cast<minifi::controllers::PersistableKeyValueStoreService>(
+    controller = std::dynamic_pointer_cast<minifi::controllers::KeyValueStateStorage>(
         persistable_key_value_store_service_node->getControllerServiceImplementation());
     REQUIRE(controller != nullptr);
   }
@@ -118,13 +118,13 @@
   std::unique_ptr<core::ProcessGroup> process_group;
 
   std::shared_ptr<core::controller::ControllerServiceNode> persistable_key_value_store_service_node;
-  std::shared_ptr<minifi::controllers::PersistableKeyValueStoreService> controller;
+  std::shared_ptr<minifi::controllers::KeyValueStateStorage> controller;
 
   TestController testController;
 };
 
 
-TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture set and get", "[basic]") {
+TEST_CASE_METHOD(PersistentStateStorageTestsFixture, "PersistentStateStorageTestsFixture set and get", "[basic]") {
   const char* key = "foobar";
   const char* value = "234";
   REQUIRE(true == controller->set(key, value));
@@ -141,7 +141,7 @@
   REQUIRE(value == res);
 }
 
-TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture special characters", "[basic]") {
+TEST_CASE_METHOD(PersistentStateStorageTestsFixture, "PersistentStateStorageTestsFixture special characters", "[basic]") {
   const char* key = "[]{}()==\\=\n\n";
   const char* value = ":./'\\=!\n=[]{}()";
   REQUIRE(true == controller->set(key, value));
@@ -158,7 +158,7 @@
   REQUIRE(value == res);
 }
 
-TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture set and get all", "[basic]") {
+TEST_CASE_METHOD(PersistentStateStorageTestsFixture, "PersistentStateStorageTestsFixture set and get all", "[basic]") {
   const std::unordered_map<std::string, std::string> kvs = {
       {"foobar", "234"},
       {"buzz", "value"},
@@ -179,7 +179,7 @@
   REQUIRE(kvs == kvs_res);
 }
 
-TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture set and overwrite", "[basic]") {
+TEST_CASE_METHOD(PersistentStateStorageTestsFixture, "PersistentStateStorageTestsFixture set and overwrite", "[basic]") {
   const char* key = "foobar";
   const char* value = "234";
   const char* new_value = "baz";
@@ -198,7 +198,7 @@
   REQUIRE(new_value == res);
 }
 
-TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture set and remove", "[basic]") {
+TEST_CASE_METHOD(PersistentStateStorageTestsFixture, "PersistentStateStorageTestsFixture set and remove", "[basic]") {
   const char* key = "foobar";
   const char* value = "234";
   REQUIRE(true == controller->set(key, value));
@@ -216,7 +216,7 @@
 }
 
 
-TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture set and clear", "[basic]") {
+TEST_CASE_METHOD(PersistentStateStorageTestsFixture, "PersistentStateStorageTestsFixture set and clear", "[basic]") {
   const std::unordered_map<std::string, std::string> kvs = {
       {"foobar", "234"},
       {"buzz", "value"},
diff --git a/libminifi/test/keyvalue-tests/UnorderedMapKeyValueStoreServiceTest.cpp b/libminifi/test/keyvalue-tests/VolatileMapStateStorageTest.cpp
similarity index 78%
rename from libminifi/test/keyvalue-tests/UnorderedMapKeyValueStoreServiceTest.cpp
rename to libminifi/test/keyvalue-tests/VolatileMapStateStorageTest.cpp
index 6f0dc9f..6edc57d 100644
--- a/libminifi/test/keyvalue-tests/UnorderedMapKeyValueStoreServiceTest.cpp
+++ b/libminifi/test/keyvalue-tests/VolatileMapStateStorageTest.cpp
@@ -21,14 +21,13 @@
 #include <string>
 #include "../TestBase.h"
 #include "../Catch.h"
-#include "core/controller/ControllerService.h"
+#include "controllers/keyvalue/AutoPersistor.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
 #include "core/ProcessGroup.h"
 #include "core/yaml/YamlConfiguration.h"
 #include "unit/ProvenanceTestHelper.h"
 #include "repository/VolatileContentRepository.h"
 
-#include "Catch.h"
-
 namespace {
   std::string config_yaml; // NOLINT
 }
@@ -39,7 +38,7 @@
   auto cli = session.cli()
       | Catch::clara::Opt{config_yaml, "config-yaml"}
           ["--config-yaml"]
-          ("path to the config.yaml containing the UnorderedMapKeyValueStoreServiceTest controller service configuration");
+          ("path to the config.yaml containing the VolatileMapStateStorageTest controller service configuration");
   session.cli(cli);
 
   int ret = session.applyCommandLine(argc, argv);
@@ -48,19 +47,19 @@
   }
 
   if (config_yaml.empty()) {
-    std::cerr << "Missing --config-yaml <path>. It must contain the path to the config.yaml containing the UnorderedMapKeyValueStoreServiceTest controller service configuration." << std::endl;
+    std::cerr << "Missing --config-yaml <path>. It must contain the path to the config.yaml containing the VolatileMapStateStorageTest controller service configuration." << std::endl;
     return -1;
   }
 
   return session.run();
 }
 
-class UnorderedMapKeyValueStoreServiceTestFixture {
+class VolatileMapStateStorageTestFixture {
  public:
-  UnorderedMapKeyValueStoreServiceTestFixture() {
+  VolatileMapStateStorageTestFixture() {
     LogTestController::getInstance().setTrace<TestPlan>();
-    LogTestController::getInstance().setTrace<minifi::controllers::PersistableKeyValueStoreService>();
-    LogTestController::getInstance().setTrace<minifi::controllers::AbstractAutoPersistingKeyValueStoreService>();
+    LogTestController::getInstance().setTrace<minifi::controllers::KeyValueStateStorage>();
+    LogTestController::getInstance().setTrace<minifi::controllers::AutoPersistor>();
 
     std::filesystem::current_path(testController.createTempDirectory());
 
@@ -72,12 +71,12 @@
     REQUIRE(key_value_store_service_node != nullptr);
     key_value_store_service_node->enable();
 
-    controller = std::dynamic_pointer_cast<minifi::controllers::PersistableKeyValueStoreService>(
+    controller = std::dynamic_pointer_cast<minifi::controllers::KeyValueStateStorage>(
             key_value_store_service_node->getControllerServiceImplementation());
     REQUIRE(controller != nullptr);
   }
 
-  virtual ~UnorderedMapKeyValueStoreServiceTestFixture() {
+  virtual ~VolatileMapStateStorageTestFixture() {
     LogTestController::getInstance().reset();
   }
 
@@ -93,12 +92,12 @@
   std::unique_ptr<core::ProcessGroup> process_group;
 
   std::shared_ptr<core::controller::ControllerServiceNode> key_value_store_service_node;
-  std::shared_ptr<minifi::controllers::PersistableKeyValueStoreService> controller;
+  std::shared_ptr<minifi::controllers::KeyValueStateStorage> controller;
 
   TestController testController;
 };
 
-TEST_CASE_METHOD(UnorderedMapKeyValueStoreServiceTestFixture, "UnorderedMapKeyValueStoreServiceTest set and get", "[basic]") {
+TEST_CASE_METHOD(VolatileMapStateStorageTestFixture, "VolatileMapStateStorageTest set and get", "[basic]") {
   const char* key = "foobar";
   const char* value = "234";
   REQUIRE(true == controller->set(key, value));
@@ -108,7 +107,7 @@
   REQUIRE(value == res);
 }
 
-TEST_CASE_METHOD(UnorderedMapKeyValueStoreServiceTestFixture, "UnorderedMapKeyValueStoreServiceTestFixture set and get all", "[basic]") {
+TEST_CASE_METHOD(VolatileMapStateStorageTestFixture, "VolatileMapStateStorageTestFixture set and get all", "[basic]") {
   const std::unordered_map<std::string, std::string> kvs = {
           {"foobar", "234"},
           {"buzz", "value"},
@@ -122,7 +121,7 @@
   REQUIRE(kvs == kvs_res);
 }
 
-TEST_CASE_METHOD(UnorderedMapKeyValueStoreServiceTestFixture, "UnorderedMapKeyValueStoreServiceTestFixture set and overwrite", "[basic]") {
+TEST_CASE_METHOD(VolatileMapStateStorageTestFixture, "VolatileMapStateStorageTestFixture set and overwrite", "[basic]") {
   const char* key = "foobar";
   const char* value = "234";
   const char* new_value = "baz";
@@ -134,7 +133,7 @@
   REQUIRE(new_value == res);
 }
 
-TEST_CASE_METHOD(UnorderedMapKeyValueStoreServiceTestFixture, "UnorderedMapKeyValueStoreServiceTestFixture set and remove", "[basic]") {
+TEST_CASE_METHOD(VolatileMapStateStorageTestFixture, "VolatileMapStateStorageTestFixture set and remove", "[basic]") {
   const char* key = "foobar";
   const char* value = "234";
   REQUIRE(true == controller->set(key, value));
@@ -145,7 +144,7 @@
 }
 
 
-TEST_CASE_METHOD(UnorderedMapKeyValueStoreServiceTestFixture, "UnorderedMapKeyValueStoreServiceTestFixture set and clear", "[basic]") {
+TEST_CASE_METHOD(VolatileMapStateStorageTestFixture, "VolatileMapStateStorageTestFixture set and clear", "[basic]") {
   const std::unordered_map<std::string, std::string> kvs = {
           {"foobar", "234"},
           {"buzz", "value"},
diff --git a/libminifi/test/resources/UnorderedMapPersistableKeyValueStoreServiceTest.yml b/libminifi/test/resources/PersistentMapStateStorage.yml
similarity index 94%
rename from libminifi/test/resources/UnorderedMapPersistableKeyValueStoreServiceTest.yml
rename to libminifi/test/resources/PersistentMapStateStorage.yml
index 61d8f26..e1c11a4 100644
--- a/libminifi/test/resources/UnorderedMapPersistableKeyValueStoreServiceTest.yml
+++ b/libminifi/test/resources/PersistentMapStateStorage.yml
@@ -26,7 +26,7 @@
 Controller Services:
     - name: testcontroller
       id: 2438e3c8-015a-1000-79ca-83af40ec1994
-      class: UnorderedMapPersistableKeyValueStoreService
+      class: PersistentMapStateStorage
       Properties:
         Auto Persistence Interval:
             - value: 0 sec
diff --git a/libminifi/test/resources/RocksDbPersistableKeyValueStoreServiceTest.yml b/libminifi/test/resources/RocksDbStateStorage.yml
similarity index 95%
rename from libminifi/test/resources/RocksDbPersistableKeyValueStoreServiceTest.yml
rename to libminifi/test/resources/RocksDbStateStorage.yml
index 5819063..6600445 100644
--- a/libminifi/test/resources/RocksDbPersistableKeyValueStoreServiceTest.yml
+++ b/libminifi/test/resources/RocksDbStateStorage.yml
@@ -26,7 +26,7 @@
 Controller Services:
     - name: testcontroller
       id: 2438e3c8-015a-1000-79ca-83af40ec1994
-      class: RocksDbPersistableKeyValueStoreService
+      class: RocksDbStateStorage
       Properties:
         Auto Persistence Interval:
             - value: 0 sec
diff --git a/libminifi/test/resources/UnorderedMapKeyValueStoreServiceTest.yml b/libminifi/test/resources/VolatileMapStateStorage.yml
similarity index 95%
rename from libminifi/test/resources/UnorderedMapKeyValueStoreServiceTest.yml
rename to libminifi/test/resources/VolatileMapStateStorage.yml
index 45a6171..7f372cc 100644
--- a/libminifi/test/resources/UnorderedMapKeyValueStoreServiceTest.yml
+++ b/libminifi/test/resources/VolatileMapStateStorage.yml
@@ -26,6 +26,6 @@
 Controller Services:
     - name: testcontroller
       id: 2438e3c8-015a-1000-79ca-83af40ec1994
-      class: UnorderedMapKeyValueStoreService
+      class: VolatileMapStateStorage
 
 Remote Processing Groups: []