MINIFICPP-1309 - RAII based resource ownership

MINIFICPP-1309 - Use path as resource id

Signed-off-by: Arpad Boda <aboda@apache.org>

This closes #857
diff --git a/extensions/expression-language/tests/ExpressionLanguageTests.cpp b/extensions/expression-language/tests/ExpressionLanguageTests.cpp
index d61a003..3a08ad5 100644
--- a/extensions/expression-language/tests/ExpressionLanguageTests.cpp
+++ b/extensions/expression-language/tests/ExpressionLanguageTests.cpp
@@ -38,11 +38,6 @@
 
 namespace expression = org::apache::nifi::minifi::expression;
 
-class MockFlowFile : public core::FlowFile {
-  void releaseClaim(const std::shared_ptr<minifi::ResourceClaim> claim) override {
-  }
-};
-
 TEST_CASE("Trivial static expression", "[expressionLanguageTestTrivialStaticExpr]") {  // NOLINT
   REQUIRE("a" == expression::make_static("a")({ }).asString());
 }
@@ -58,7 +53,7 @@
 }
 
 TEST_CASE("Attribute expression", "[expressionLanguageTestAttributeExpression]") {  // NOLINT
-  auto flow_file = std::make_shared<MockFlowFile>();
+  auto flow_file = std::make_shared<core::FlowFile>();
   flow_file->addAttribute("attr_a", "__attr_value_a__");
   auto expr = expression::compile("text_before${attr_a}text_after");
   REQUIRE("text_before__attr_value_a__text_after" == expr({ flow_file }).asString());
@@ -66,12 +61,12 @@
 
 TEST_CASE("Attribute expression (Null)", "[expressionLanguageTestAttributeExpressionNull]") {  // NOLINT
   auto expr = expression::compile("text_before${attr_a}text_after");
-  std::shared_ptr<MockFlowFile> flow_file = nullptr;
-  REQUIRE("text_beforetext_after" == expr({ flow_file }).asString());
+  std::shared_ptr<core::FlowFile> flow_file = nullptr;
+  REQUIRE("text_beforetext_after" == expr( { flow_file }).asString());
 }
 
 TEST_CASE("Multi-attribute expression", "[expressionLanguageTestMultiAttributeExpression]") {  // NOLINT
-  auto flow_file = std::make_shared<MockFlowFile>();
+  auto flow_file = std::make_shared<core::FlowFile>();
   flow_file->addAttribute("attr_a", "__attr_value_a__");
   flow_file->addAttribute("attr_b", "__attr_value_b__");
   auto expr = expression::compile("text_before${attr_a}text_between${attr_b}text_after");
@@ -82,17 +77,17 @@
     "[expressionLanguageTestMultiFlowfileAttributeExpression]") {  // NOLINT
   auto expr = expression::compile("text_before${attr_a}text_after");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr_a", "__flow_a_attr_value_a__");
   REQUIRE("text_before__flow_a_attr_value_a__text_after" == expr({ flow_file_a }).asString());
 
-  auto flow_file_b = std::make_shared<MockFlowFile>();
+  auto flow_file_b = std::make_shared<core::FlowFile>();
   flow_file_b->addAttribute("attr_a", "__flow_b_attr_value_a__");
   REQUIRE("text_before__flow_b_attr_value_a__text_after" == expr({ flow_file_b }).asString());
 }
 
 TEST_CASE("Attribute expression with whitespace", "[expressionLanguageTestAttributeExpressionWhitespace]") {  // NOLINT
-  auto flow_file = std::make_shared<MockFlowFile>();
+  auto flow_file = std::make_shared<core::FlowFile>();
   flow_file->addAttribute("attr_a", "__attr_value_a__");
   auto expr = expression::compile("text_before${\n\tattr_a \r}text_after");
   REQUIRE("text_before__attr_value_a__text_after" == expr({ flow_file }).asString());
@@ -101,7 +96,7 @@
 TEST_CASE("Special characters expression", "[expressionLanguageTestSpecialCharactersExpression]") {  // NOLINT
   auto expr = expression::compile("text_before|{}()[],:;\\/*#'\" \t\r\n${attr_a}}()text_after");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr_a", "__flow_a_attr_value_a__");
   REQUIRE("text_before|{}()[],:;\\/*#'\" \t\r\n__flow_a_attr_value_a__}()text_after" == expr({ flow_file_a }).asString());
 }
@@ -109,7 +104,7 @@
 TEST_CASE("UTF-8 characters expression", "[expressionLanguageTestUTF8Expression]") {  // NOLINT
   auto expr = expression::compile("text_before¥£€¢₡₢₣₤₥₦₧₨₩₪₫₭₮₯₹${attr_a}text_after");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr_a", "__flow_a_attr_value_a__");
   REQUIRE("text_before¥£€¢₡₢₣₤₥₦₧₨₩₪₫₭₮₯₹__flow_a_attr_value_a__text_after" == expr({ flow_file_a }).asString());
 }
@@ -117,7 +112,7 @@
 TEST_CASE("UTF-8 characters attribute", "[expressionLanguageTestUTF8Attribute]") {  // NOLINT
   auto expr = expression::compile("text_before${attr_a}text_after");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr_a", "__¥£€¢₡₢₣₤₥₦₧₨₩₪₫₭₮₯₹__");
   REQUIRE("text_before__¥£€¢₡₢₣₤₥₦₧₨₩₪₫₭₮₯₹__text_after" == expr({ flow_file_a }).asString());
 }
@@ -125,7 +120,7 @@
 TEST_CASE("Single quoted attribute expression", "[expressionLanguageTestSingleQuotedAttributeExpression]") {  // NOLINT
   auto expr = expression::compile("text_before${'|{}()[],:;\\\\/*# \t\r\n$'}text_after");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("|{}()[],:;\\/*# \t\r\n$", "__flow_a_attr_value_a__");
   REQUIRE("text_before__flow_a_attr_value_a__text_after" == expr({ flow_file_a }).asString());
 }
@@ -133,7 +128,7 @@
 TEST_CASE("Double quoted attribute expression", "[expressionLanguageTestDoubleQuotedAttributeExpression]") {  // NOLINT
   auto expr = expression::compile("text_before${\"|{}()[],:;\\\\/*# \t\r\n$\"}text_after");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("|{}()[],:;\\/*# \t\r\n$", "__flow_a_attr_value_a__");
   REQUIRE("text_before__flow_a_attr_value_a__text_after" == expr({ flow_file_a }).asString());
 }
@@ -148,22 +143,22 @@
   expected.append(hostname);
   expected.append("text_after");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
-  REQUIRE(expected == expr({ flow_file_a }).asString());
+  auto flow_file_a = std::make_shared<core::FlowFile>();
+  REQUIRE(expected == expr( { flow_file_a }).asString());
 }
 
 TEST_CASE("ToUpper function", "[expressionLanguageTestToUpperFunction]") {  // NOLINT
   auto expr = expression::compile(R"(text_before${
                                        attr_a : toUpper()
                                      }text_after)");
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr_a", "__flow_a_attr_value_a__");
   REQUIRE("text_before__FLOW_A_ATTR_VALUE_A__text_after" == expr({ flow_file_a }).asString());
 }
 
 TEST_CASE("ToUpper function w/o whitespace", "[expressionLanguageTestToUpperFunctionWithoutWhitespace]") {  // NOLINT
   auto expr = expression::compile(R"(text_before${attr_a:toUpper()}text_after)");
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr_a", "__flow_a_attr_value_a__");
   REQUIRE("text_before__FLOW_A_ATTR_VALUE_A__text_after" == expr({ flow_file_a }).asString());
 }
@@ -172,7 +167,7 @@
   auto expr = expression::compile(R"(text_before${
                                        attr_a : toLower()
                                      }text_after)");
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr_a", "__FLOW_A_ATTR_VALUE_A__");
   REQUIRE("text_before__flow_a_attr_value_a__text_after" == expr({ flow_file_a }).asString());
 }
@@ -250,7 +245,7 @@
 TEST_CASE("Substring 2 arg", "[expressionLanguageSubstring2]") {  // NOLINT
   auto expr = expression::compile("text_before${attr:substring(6, 8)}text_after");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "__flow_a_attr_value_a__");
   REQUIRE("text_before_a_attr_text_after" == expr({ flow_file_a }).asString());
 }
@@ -258,7 +253,7 @@
 TEST_CASE("Substring 1 arg", "[expressionLanguageSubstring1]") {  // NOLINT
   auto expr = expression::compile("text_before${attr:substring(6)}text_after");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "__flow_a_attr_value_a__");
   REQUIRE("text_before_a_attr_value_a__text_after" == expr({ flow_file_a }).asString());
 }
@@ -266,7 +261,7 @@
 TEST_CASE("Substring Before", "[expressionLanguageSubstringBefore]") {  // NOLINT
   auto expr = expression::compile("${attr:substringBefore('attr_value_a__')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "__flow_a_attr_value_a__");
   REQUIRE("__flow_a_" == expr({ flow_file_a }).asString());
 }
@@ -274,7 +269,7 @@
 TEST_CASE("Substring Before Last", "[expressionLanguageSubstringBeforeLast]") {  // NOLINT
   auto expr = expression::compile("${attr:substringBeforeLast('_a')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "__flow_a_attr_value_a__");
   REQUIRE("__flow_a_attr_value" == expr({ flow_file_a }).asString());
 }
@@ -282,7 +277,7 @@
 TEST_CASE("Substring After", "[expressionLanguageSubstringAfter]") {  // NOLINT
   auto expr = expression::compile("${attr:substringAfter('__flow_a')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "__flow_a_attr_value_a__");
   REQUIRE("_attr_value_a__" == expr({ flow_file_a }).asString());
 }
@@ -290,7 +285,7 @@
 TEST_CASE("Substring After Last", "[expressionLanguageSubstringAfterLast]") {  // NOLINT
   auto expr = expression::compile("${attr:substringAfterLast('_a')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "__flow_a_attr_value_a__");
   REQUIRE("__" == expr({ flow_file_a }).asString());
 }
@@ -298,7 +293,7 @@
 TEST_CASE("Get Delimited", "[expressionLanguageGetDelimited]") {  // NOLINT
   auto expr = expression::compile("${attr:getDelimitedField(2)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "\"Jacobson, John\", 32, Mr.");
   REQUIRE(" 32" == expr({ flow_file_a }).asString());
 }
@@ -306,7 +301,7 @@
 TEST_CASE("Get Delimited 2", "[expressionLanguageGetDelimited2]") {  // NOLINT
   auto expr = expression::compile("${attr:getDelimitedField(1)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "\"Jacobson, John\", 32, Mr.");
   REQUIRE("\"Jacobson, John\"" == expr({ flow_file_a }).asString());
 }
@@ -314,7 +309,7 @@
 TEST_CASE("Get Delimited 3", "[expressionLanguageGetDelimited3]") {  // NOLINT
   auto expr = expression::compile("${attr:getDelimitedField(1, ',', '\\\"', '\\\\', 'true')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "\"Jacobson, John\", 32, Mr.");
   REQUIRE("Jacobson, John" == expr({ flow_file_a }).asString());
 }
@@ -322,7 +317,7 @@
 TEST_CASE("Starts With", "[expressionLanguageStartsWith]") {  // NOLINT
   auto expr = expression::compile("${attr:startsWith('a brand')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "A BRAND TEST");
   REQUIRE("false" == expr({ flow_file_a }).asString());
 }
@@ -330,7 +325,7 @@
 TEST_CASE("Starts With 2", "[expressionLanguageStartsWith2]") {  // NOLINT
   auto expr = expression::compile("${attr:startsWith('a brand')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand TEST");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -338,7 +333,7 @@
 TEST_CASE("Ends With", "[expressionLanguageEndsWith]") {  // NOLINT
   auto expr = expression::compile("${attr:endsWith('txt')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.TXT");
   REQUIRE("false" == expr({ flow_file_a }).asString());
 }
@@ -346,7 +341,7 @@
 TEST_CASE("Ends With 2", "[expressionLanguageEndsWith2]") {  // NOLINT
   auto expr = expression::compile("${attr:endsWith('TXT')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.TXT");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -354,7 +349,7 @@
 TEST_CASE("Contains", "[expressionLanguageContains]") {  // NOLINT
   auto expr = expression::compile("${attr:contains('new')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -362,7 +357,7 @@
 TEST_CASE("Contains 2", "[expressionLanguageContains2]") {  // NOLINT
   auto expr = expression::compile("${attr:contains('NEW')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("false" == expr({ flow_file_a }).asString());
 }
@@ -370,7 +365,7 @@
 TEST_CASE("In", "[expressionLanguageIn]") {  // NOLINT
   auto expr = expression::compile("${attr:in('PAUL', 'JOHN', 'MIKE')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "JOHN");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -378,7 +373,7 @@
 TEST_CASE("In 2", "[expressionLanguageIn2]") {  // NOLINT
   auto expr = expression::compile("${attr:in('RED', 'GREEN', 'BLUE')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "JOHN");
   REQUIRE("false" == expr({ flow_file_a }).asString());
 }
@@ -399,7 +394,7 @@
 TEST_CASE("Replace", "[expressionLanguageReplace]") {  // NOLINT
   auto expr = expression::compile("${attr:replace('.', '_')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("a brand new filename_txt" == expr({ flow_file_a }).asString());
 }
@@ -407,7 +402,7 @@
 TEST_CASE("Replace 2", "[expressionLanguageReplace2]") {  // NOLINT
   auto expr = expression::compile("${attr:replace(' ', '.')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("a.brand.new.filename.txt" == expr({ flow_file_a }).asString());
 }
@@ -415,7 +410,7 @@
 TEST_CASE("Replace First", "[expressionLanguageReplaceFirst]") {  // NOLINT
   auto expr = expression::compile("${attr:replaceFirst('a', 'the')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("the brand new filename.txt" == expr({ flow_file_a }).asString());
 }
@@ -423,7 +418,7 @@
 TEST_CASE("Replace First Regex", "[expressionLanguageReplaceFirstRegex]") {  // NOLINT
   auto expr = expression::compile("${attr:replaceFirst('[br]', 'g')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("a grand new filename.txt" == expr({ flow_file_a }).asString());
 }
@@ -431,7 +426,7 @@
 TEST_CASE("Replace All", "[expressionLanguageReplaceAll]") {  // NOLINT
   auto expr = expression::compile("${attr:replaceAll('\\\\..*', '')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("a brand new filename" == expr({ flow_file_a }).asString());
 }
@@ -439,7 +434,7 @@
 TEST_CASE("Replace All 2", "[expressionLanguageReplaceAll2]") {  // NOLINT
   auto expr = expression::compile("${attr:replaceAll('a brand (new)', '$1')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("new filename.txt" == expr({ flow_file_a }).asString());
 }
@@ -447,7 +442,7 @@
 TEST_CASE("Replace All 3", "[expressionLanguageReplaceAll3]") {  // NOLINT
   auto expr = expression::compile("${attr:replaceAll('XYZ', 'ZZZ')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("a brand new filename.txt" == expr({ flow_file_a }).asString());
 }
@@ -455,7 +450,7 @@
 TEST_CASE("Replace Null", "[expressionLanguageReplaceNull]") {  // NOLINT
   auto expr = expression::compile("${attr:replaceNull('abc')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("a brand new filename.txt" == expr({ flow_file_a }).asString());
 }
@@ -463,7 +458,7 @@
 TEST_CASE("Replace Null 2", "[expressionLanguageReplaceNull2]") {  // NOLINT
   auto expr = expression::compile("${attr:replaceNull('abc')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr2", "a brand new filename.txt");
   REQUIRE("abc" == expr({ flow_file_a }).asString());
 }
@@ -471,7 +466,7 @@
 TEST_CASE("Replace Empty", "[expressionLanguageReplaceEmpty]") {  // NOLINT
   auto expr = expression::compile("${attr:replaceEmpty('abc')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("a brand new filename.txt" == expr({ flow_file_a }).asString());
 }
@@ -479,7 +474,7 @@
 TEST_CASE("Replace Empty 2", "[expressionLanguageReplaceEmpty2]") {  // NOLINT
   auto expr = expression::compile("${attr:replaceEmpty('abc')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "  \t  \r  \n  ");
   REQUIRE("abc" == expr({ flow_file_a }).asString());
 }
@@ -487,7 +482,7 @@
 TEST_CASE("Replace Empty 3", "[expressionLanguageReplaceEmpty2]") {  // NOLINT
   auto expr = expression::compile("${attr:replaceEmpty('abc')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr2", "test");
   REQUIRE("abc" == expr({ flow_file_a }).asString());
 }
@@ -495,7 +490,7 @@
 TEST_CASE("Matches", "[expressionLanguageMatches]") {  // NOLINT
   auto expr = expression::compile("${attr:matches('^(Ct|Bt|At):.*t$')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "At:est");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -503,7 +498,7 @@
 TEST_CASE("Matches 2", "[expressionLanguageMatches2]") {  // NOLINT
   auto expr = expression::compile("${attr:matches('^(Ct|Bt|At):.*t$')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "At:something");
   REQUIRE("false" == expr({ flow_file_a }).asString());
 }
@@ -511,7 +506,7 @@
 TEST_CASE("Matches 3", "[expressionLanguageMatches3]") {  // NOLINT
   auto expr = expression::compile("${attr:matches('(Ct|Bt|At):.*t')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", " At:est");
   REQUIRE("false" == expr({ flow_file_a }).asString());
 }
@@ -519,7 +514,7 @@
 TEST_CASE("Find", "[expressionLanguageFind]") {  // NOLINT
   auto expr = expression::compile("${attr:find('a [Bb]rand [Nn]ew')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -527,7 +522,7 @@
 TEST_CASE("Find 2", "[expressionLanguageFind2]") {  // NOLINT
   auto expr = expression::compile("${attr:find('Brand.*')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("false" == expr({ flow_file_a }).asString());
 }
@@ -535,7 +530,7 @@
 TEST_CASE("Find 3", "[expressionLanguageFind3]") {  // NOLINT
   auto expr = expression::compile("${attr:find('brand')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -543,7 +538,7 @@
 TEST_CASE("IndexOf", "[expressionLanguageIndexOf]") {  // NOLINT
   auto expr = expression::compile("${attr:indexOf('a.*txt')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("-1" == expr({ flow_file_a }).asString());
 }
@@ -551,7 +546,7 @@
 TEST_CASE("IndexOf2", "[expressionLanguageIndexOf2]") {  // NOLINT
   auto expr = expression::compile("${attr:indexOf('.')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("20" == expr({ flow_file_a }).asString());
 }
@@ -559,7 +554,7 @@
 TEST_CASE("IndexOf3", "[expressionLanguageIndexOf3]") {  // NOLINT
   auto expr = expression::compile("${attr:indexOf('a')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("0" == expr({ flow_file_a }).asString());
 }
@@ -567,7 +562,7 @@
 TEST_CASE("IndexOf4", "[expressionLanguageIndexOf4]") {  // NOLINT
   auto expr = expression::compile("${attr:indexOf(' ')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("1" == expr({ flow_file_a }).asString());
 }
@@ -575,7 +570,7 @@
 TEST_CASE("LastIndexOf", "[expressionLanguageLastIndexOf]") {  // NOLINT
   auto expr = expression::compile("${attr:lastIndexOf('a.*txt')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("-1" == expr({ flow_file_a }).asString());
 }
@@ -583,7 +578,7 @@
 TEST_CASE("LastIndexOf2", "[expressionLanguageLastIndexOf2]") {  // NOLINT
   auto expr = expression::compile("${attr:lastIndexOf('.')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("20" == expr({ flow_file_a }).asString());
 }
@@ -591,7 +586,7 @@
 TEST_CASE("LastIndexOf3", "[expressionLanguageLastIndexOf3]") {  // NOLINT
   auto expr = expression::compile("${attr:lastIndexOf('a')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("17" == expr({ flow_file_a }).asString());
 }
@@ -599,7 +594,7 @@
 TEST_CASE("LastIndexOf4", "[expressionLanguageLastIndexOf4]") {  // NOLINT
   auto expr = expression::compile("${attr:lastIndexOf(' ')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "a brand new filename.txt");
   REQUIRE("11" == expr({ flow_file_a }).asString());
 }
@@ -609,7 +604,7 @@
 TEST_CASE("Plus Integer", "[expressionLanguagePlusInteger]") {  // NOLINT
   auto expr = expression::compile("${attr:plus(13)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "11");
   REQUIRE("24" == expr({ flow_file_a }).asString());
 }
@@ -617,7 +612,7 @@
 TEST_CASE("Plus Decimal", "[expressionLanguagePlusDecimal]") {  // NOLINT
   auto expr = expression::compile("${attr:plus(-13.34567)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "11.1");
   REQUIRE("-2.24567" == expr({ flow_file_a }).asString());
 }
@@ -625,7 +620,7 @@
 TEST_CASE("Plus Exponent", "[expressionLanguagePlusExponent]") {  // NOLINT
   auto expr = expression::compile("${attr:plus(10e+6)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "11");
   REQUIRE("10000011" == expr({ flow_file_a }).asString());
 }
@@ -633,7 +628,7 @@
 TEST_CASE("Plus Exponent 2", "[expressionLanguagePlusExponent2]") {  // NOLINT
   auto expr = expression::compile("${attr:plus(10e+6)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "11.345678901234");
   REQUIRE("10000011.345678901234351" == expr({ flow_file_a }).asString());
 }
@@ -641,7 +636,7 @@
 TEST_CASE("Minus Integer", "[expressionLanguageMinusInteger]") {  // NOLINT
   auto expr = expression::compile("${attr:minus(13)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "11");
   REQUIRE("-2" == expr({ flow_file_a }).asString());
 }
@@ -649,7 +644,7 @@
 TEST_CASE("Minus Decimal", "[expressionLanguageMinusDecimal]") {  // NOLINT
   auto expr = expression::compile("${attr:minus(-13.34567)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "11.1");
   REQUIRE("24.44567" == expr({ flow_file_a }).asString());
 }
@@ -657,7 +652,7 @@
 TEST_CASE("Multiply Integer", "[expressionLanguageMultiplyInteger]") {  // NOLINT
   auto expr = expression::compile("${attr:multiply(13)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "11");
   REQUIRE("143" == expr({ flow_file_a }).asString());
 }
@@ -665,7 +660,7 @@
 TEST_CASE("Multiply Decimal", "[expressionLanguageMultiplyDecimal]") {  // NOLINT
   auto expr = expression::compile("${attr:multiply(-13.34567)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "11.1");
   REQUIRE("-148.136937" == expr({ flow_file_a }).asString());
 }
@@ -673,7 +668,7 @@
 TEST_CASE("Divide Integer", "[expressionLanguageDivideInteger]") {  // NOLINT
   auto expr = expression::compile("${attr:divide(13)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "11");
   REQUIRE("0.846153846153846" == expr({ flow_file_a }).asString());
 }
@@ -681,7 +676,7 @@
 TEST_CASE("Divide Decimal", "[expressionLanguageDivideDecimal]") {  // NOLINT
   auto expr = expression::compile("${attr:divide(-13.34567)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "11.1");
   REQUIRE("-0.831730441409086" == expr({ flow_file_a }).asString());
 }
@@ -689,7 +684,7 @@
 TEST_CASE("To Radix", "[expressionLanguageToRadix]") {  // NOLINT
   auto expr = expression::compile("${attr:toRadix(2,16)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "10");
   REQUIRE("0000000000001010" == expr({ flow_file_a }).asString());
 }
@@ -697,7 +692,7 @@
 TEST_CASE("To Radix 2", "[expressionLanguageToRadix2]") {  // NOLINT
   auto expr = expression::compile("${attr:toRadix(16)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "13");
   REQUIRE("d" == expr({ flow_file_a }).asString());
 }
@@ -705,7 +700,7 @@
 TEST_CASE("To Radix 3", "[expressionLanguageToRadix3]") {  // NOLINT
   auto expr = expression::compile("${attr:toRadix(23,8)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "-2347");
   REQUIRE("-000004a1" == expr({ flow_file_a }).asString());
 }
@@ -713,7 +708,7 @@
 TEST_CASE("From Radix", "[expressionLanguageFromRadix]") {  // NOLINT
   auto expr = expression::compile("${attr:fromRadix(2)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "0000000000001010");
   REQUIRE("10" == expr({ flow_file_a }).asString());
 }
@@ -721,7 +716,7 @@
 TEST_CASE("From Radix 2", "[expressionLanguageFromRadix2]") {  // NOLINT
   auto expr = expression::compile("${attr:fromRadix(16)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "d");
   REQUIRE("13" == expr({ flow_file_a }).asString());
 }
@@ -729,7 +724,7 @@
 TEST_CASE("From Radix 3", "[expressionLanguageFromRadix3]") {  // NOLINT
   auto expr = expression::compile("${attr:fromRadix(23)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "-000004a1");
   REQUIRE("-2347" == expr({ flow_file_a }).asString());
 }
@@ -737,15 +732,15 @@
 TEST_CASE("Random", "[expressionLanguageRandom]") {  // NOLINT
   auto expr = expression::compile("${random()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
-  auto result = expr({ flow_file_a }).asSignedLong();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
+  auto result = expr( { flow_file_a }).asSignedLong();
   REQUIRE(result > 0);
 }
 
 TEST_CASE("Chained call", "[expressionChainedCall]") {  // NOLINT
   auto expr = expression::compile("${attr:multiply(3):plus(1)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "7");
   REQUIRE("22" == expr({ flow_file_a }).asString());
 }
@@ -753,14 +748,14 @@
 TEST_CASE("Chained call 2", "[expressionChainedCall2]") {  // NOLINT
   auto expr = expression::compile("${literal(10):multiply(2):plus(1):multiply(2)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
-  REQUIRE(42 == expr({ flow_file_a }).asSignedLong());
+  auto flow_file_a = std::make_shared<core::FlowFile>();
+  REQUIRE(42 == expr( { flow_file_a }).asSignedLong());
 }
 
 TEST_CASE("Chained call 3", "[expressionChainedCall3]") {  // NOLINT
   auto expr = expression::compile("${literal(10):multiply(2):plus(${attr:multiply(2)}):multiply(${attr})}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "7");
   REQUIRE("238" == expr({ flow_file_a }).asString());
 }
@@ -768,7 +763,7 @@
 TEST_CASE("LiteralBool", "[expressionLiteralBool]") {  // NOLINT
   auto expr = expression::compile("${literal(true)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "7");
   REQUIRE(true == expr({ flow_file_a }).asBoolean());
 }
@@ -776,7 +771,7 @@
 TEST_CASE("LiteralBool 2", "[expressionLiteralBool2]") {  // NOLINT
   auto expr = expression::compile("${literal(false)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "7");
   REQUIRE(false == expr({ flow_file_a }).asBoolean());
 }
@@ -784,7 +779,7 @@
 TEST_CASE("Is Null", "[expressionIsNull]") {  // NOLINT
   auto expr = expression::compile("${filename:isNull()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "7");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -792,7 +787,7 @@
 TEST_CASE("Is Null 2", "[expressionIsNull2]") {  // NOLINT
   auto expr = expression::compile("${filename:isNull()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("filename", "7");
   REQUIRE("false" == expr({ flow_file_a }).asString());
 }
@@ -800,7 +795,7 @@
 TEST_CASE("Not Null", "[expressionNotNull]") {  // NOLINT
   auto expr = expression::compile("${filename:notNull()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "7");
   REQUIRE("false" == expr({ flow_file_a }).asString());
 }
@@ -808,7 +803,7 @@
 TEST_CASE("Not Null 2", "[expressionNotNull2]") {  // NOLINT
   auto expr = expression::compile("${filename:notNull()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("filename", "7");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -816,7 +811,7 @@
 TEST_CASE("Is Empty", "[expressionIsEmpty]") {  // NOLINT
   auto expr = expression::compile("${filename:isEmpty()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "7");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -824,7 +819,7 @@
 TEST_CASE("Is Empty 2", "[expressionIsEmpty2]") {  // NOLINT
   auto expr = expression::compile("${attr:isEmpty()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "7");
   REQUIRE("false" == expr({ flow_file_a }).asString());
 }
@@ -832,7 +827,7 @@
 TEST_CASE("Is Empty 3", "[expressionIsEmpty3]") {  // NOLINT
   auto expr = expression::compile("${attr:isEmpty()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", " \t\r\n ");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -840,7 +835,7 @@
 TEST_CASE("Is Empty 4", "[expressionIsEmpty4]") {  // NOLINT
   auto expr = expression::compile("${attr:isEmpty()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -848,7 +843,7 @@
 TEST_CASE("Is Empty 5", "[expressionIsEmpty5]") {  // NOLINT
   auto expr = expression::compile("${attr:isEmpty()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", " \t\r\n a \t\r\n ");
   REQUIRE("false" == expr({ flow_file_a }).asString());
 }
@@ -856,7 +851,7 @@
 TEST_CASE("Equals", "[expressionEquals]") {  // NOLINT
   auto expr = expression::compile("${attr:equals('hello.txt')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "hello.txt");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -864,7 +859,7 @@
 TEST_CASE("Equals 2", "[expressionEquals2]") {  // NOLINT
   auto expr = expression::compile("${attr:equals('hello.txt')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "helllo.txt");
   REQUIRE("false" == expr({ flow_file_a }).asString());
 }
@@ -872,7 +867,7 @@
 TEST_CASE("Equals 3", "[expressionEquals3]") {  // NOLINT
   auto expr = expression::compile("${attr:plus(5):equals(6)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "1");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -880,7 +875,7 @@
 TEST_CASE("Equals Ignore Case", "[expressionEqualsIgnoreCase]") {  // NOLINT
   auto expr = expression::compile("${attr:equalsIgnoreCase('hElLo.txt')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "hello.txt");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -888,7 +883,7 @@
 TEST_CASE("Equals Ignore Case 2", "[expressionEqualsIgnoreCase2]") {  // NOLINT
   auto expr = expression::compile("${attr:plus(5):equalsIgnoreCase(6)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "1");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -896,7 +891,7 @@
 TEST_CASE("GT", "[expressionGT]") {  // NOLINT
   auto expr = expression::compile("${attr:plus(5):gt(5)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "1");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -904,7 +899,7 @@
 TEST_CASE("GT2", "[expressionGT2]") {  // NOLINT
   auto expr = expression::compile("${attr:plus(5.1):gt(6.05)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "1");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -912,7 +907,7 @@
 TEST_CASE("GT3", "[expressionGT3]") {  // NOLINT
   auto expr = expression::compile("${attr:plus(5.1):gt(6.15)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "1");
   REQUIRE("false" == expr({ flow_file_a }).asString());
 }
@@ -920,7 +915,7 @@
 TEST_CASE("GE", "[expressionGE]") {  // NOLINT
   auto expr = expression::compile("${attr:plus(5):ge(6)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "1");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -928,7 +923,7 @@
 TEST_CASE("GE2", "[expressionGE2]") {  // NOLINT
   auto expr = expression::compile("${attr:plus(5.1):ge(6.05)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "1");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -936,7 +931,7 @@
 TEST_CASE("GE3", "[expressionGE3]") {  // NOLINT
   auto expr = expression::compile("${attr:plus(5.1):ge(6.15)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "1");
   REQUIRE("false" == expr({ flow_file_a }).asString());
 }
@@ -944,7 +939,7 @@
 TEST_CASE("LT", "[expressionLT]") {  // NOLINT
   auto expr = expression::compile("${attr:plus(5):lt(5)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "1");
   REQUIRE("false" == expr({ flow_file_a }).asString());
 }
@@ -952,7 +947,7 @@
 TEST_CASE("LT2", "[expressionLT2]") {  // NOLINT
   auto expr = expression::compile("${attr:plus(5.1):lt(6.05)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "1");
   REQUIRE("false" == expr({ flow_file_a }).asString());
 }
@@ -960,7 +955,7 @@
 TEST_CASE("LT3", "[expressionLT3]") {  // NOLINT
   auto expr = expression::compile("${attr:plus(5.1):lt(6.15)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "1");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -968,7 +963,7 @@
 TEST_CASE("LE", "[expressionLE]") {  // NOLINT
   auto expr = expression::compile("${attr:plus(5):le(6)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "1");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -976,7 +971,7 @@
 TEST_CASE("LE2", "[expressionLE2]") {  // NOLINT
   auto expr = expression::compile("${attr:plus(5.1):le(6.05)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "1");
   REQUIRE("false" == expr({ flow_file_a }).asString());
 }
@@ -984,7 +979,7 @@
 TEST_CASE("LE3", "[expressionLE3]") {  // NOLINT
   auto expr = expression::compile("${attr:plus(5.1):le(6.15)}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("attr", "1");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -992,7 +987,7 @@
 TEST_CASE("And", "[expressionAnd]") {  // NOLINT
   auto expr = expression::compile("${filename:toLower():equals( ${filename} ):and(${filename:substring(0, 2):equals('an')})}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("filename", "an example file.txt");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -1000,7 +995,7 @@
 TEST_CASE("And 2", "[expressionAnd2]") {  // NOLINT
   auto expr = expression::compile("${filename:toLower():equals( ${filename} ):and(${filename:substring(0, 2):equals('ab')})}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("filename", "an example file.txt");
   REQUIRE("false" == expr({ flow_file_a }).asString());
 }
@@ -1008,7 +1003,7 @@
 TEST_CASE("Or", "[expressionOr]") {  // NOLINT
   auto expr = expression::compile("${filename:toLower():equals( ${filename} ):or(${filename:substring(0, 2):equals('an')})}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("filename", "an example file.txt");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -1016,7 +1011,7 @@
 TEST_CASE("Or 2", "[expressionOr2]") {  // NOLINT
   auto expr = expression::compile("${filename:toLower():equals( ${filename} ):or(${filename:substring(0, 2):equals('ab')})}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("filename", "an example file.txt");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -1024,7 +1019,7 @@
 TEST_CASE("Not", "[expressionNot]") {  // NOLINT
   auto expr = expression::compile("${filename:toLower():equals( ${filename} ):and(${filename:substring(0, 2):equals('an')}):not()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("filename", "an example file.txt");
   REQUIRE("false" == expr({ flow_file_a }).asString());
 }
@@ -1032,7 +1027,7 @@
 TEST_CASE("Not 2", "[expressionNot2]") {  // NOLINT
   auto expr = expression::compile("${filename:toLower():equals( ${filename} ):and(${filename:substring(0, 2):equals('ab')}):not()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("filename", "an example file.txt");
   REQUIRE("true" == expr({ flow_file_a }).asString());
 }
@@ -1040,7 +1035,7 @@
 TEST_CASE("If Else", "[expressionIfElse]") {  // NOLINT
   auto expr = expression::compile("${filename:toLower():equals( ${filename}):ifElse('yes', 'no')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("filename", "an example file.txt");
   REQUIRE("yes" == expr({ flow_file_a }).asString());
 }
@@ -1048,7 +1043,7 @@
 TEST_CASE("If Else 2", "[expressionIfElse2]") {  // NOLINT
   auto expr = expression::compile("${filename:toLower():equals( ${filename}):ifElse('yes', 'no')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("filename", "An example file.txt");
   REQUIRE("no" == expr({ flow_file_a }).asString());
 }
@@ -1056,7 +1051,7 @@
 TEST_CASE("Encode JSON", "[expressionEncodeJSON]") {  // NOLINT
   auto expr = expression::compile("${message:escapeJson()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "This is a \"test!\"");
   REQUIRE("This is a \\\"test!\\\"" == expr({ flow_file_a }).asString());
 }
@@ -1064,7 +1059,7 @@
 TEST_CASE("Decode JSON", "[expressionDecodeJSON]") {  // NOLINT
   auto expr = expression::compile("${message:unescapeJson()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "This is a \\\"test!\\\"");
   REQUIRE("This is a \"test!\"" == expr({ flow_file_a }).asString());
 }
@@ -1072,7 +1067,7 @@
 TEST_CASE("Encode Decode JSON", "[expressionEncodeDecodeJSON]") {  // NOLINT
   auto expr = expression::compile("${message:escapeJson():unescapeJson()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "This is a \"test!\"");
   REQUIRE("This is a \"test!\"" == expr({ flow_file_a }).asString());
 }
@@ -1080,7 +1075,7 @@
 TEST_CASE("Encode XML", "[expressionEncodeXML]") {  // NOLINT
   auto expr = expression::compile("${message:escapeXml()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "Zero > One < \"two!\" & 'true'");
   REQUIRE("Zero &gt; One &lt; &quot;two!&quot; &amp; &apos;true&apos;" == expr({ flow_file_a }).asString());
 }
@@ -1088,7 +1083,7 @@
 TEST_CASE("Decode XML", "[expressionDecodeXML]") {  // NOLINT
   auto expr = expression::compile("${message:unescapeXml()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "Zero &gt; One &lt; &quot;two!&quot; &amp; &apos;true&apos;");
   REQUIRE("Zero > One < \"two!\" & 'true'" == expr({ flow_file_a }).asString());
 }
@@ -1096,7 +1091,7 @@
 TEST_CASE("Encode Decode XML", "[expressionEncodeDecodeXML]") {  // NOLINT
   auto expr = expression::compile("${message:escapeXml():unescapeXml()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "Zero > One < \"two!\" & 'true'");
   REQUIRE("Zero > One < \"two!\" & 'true'" == expr({ flow_file_a }).asString());
 }
@@ -1104,7 +1099,7 @@
 TEST_CASE("Encode HTML3", "[expressionEncodeHTML3]") {  // NOLINT
   auto expr = expression::compile("${message:escapeHtml3()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "¥ & < «");
   REQUIRE("&yen; &amp; &lt; &laquo;" == expr({ flow_file_a }).asString());
 }
@@ -1112,7 +1107,7 @@
 TEST_CASE("Decode HTML3", "[expressionDecodeHTML3]") {  // NOLINT
   auto expr = expression::compile("${message:unescapeHtml3()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "&yen; &amp; &lt; &laquo;");
   REQUIRE("¥ & < «" == expr({ flow_file_a }).asString());
 }
@@ -1120,7 +1115,7 @@
 TEST_CASE("Encode Decode HTML3", "[expressionEncodeDecodeHTML3]") {  // NOLINT
   auto expr = expression::compile("${message:escapeHtml3():unescapeHtml3()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "&yen; &amp; &lt; &laquo;");
   REQUIRE("&yen; &amp; &lt; &laquo;" == expr({ flow_file_a }).asString());
 }
@@ -1128,7 +1123,7 @@
 TEST_CASE("Encode HTML4", "[expressionEncodeHTML4]") {  // NOLINT
   auto expr = expression::compile("${message:escapeHtml4()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "¥ & Φ < «");
   REQUIRE("&yen; &amp; &Phi; &lt; &laquo;" == expr({ flow_file_a }).asString());
 }
@@ -1136,7 +1131,7 @@
 TEST_CASE("Decode HTML4", "[expressionDecodeHTML4]") {  // NOLINT
   auto expr = expression::compile("${message:unescapeHtml4()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "&yen; &iota; &amp; &lt; &laquo;");
   REQUIRE("¥ ι & < «" == expr({ flow_file_a }).asString());
 }
@@ -1144,7 +1139,7 @@
 TEST_CASE("Encode Decode HTML4", "[expressionEncodeDecodeHTML4]") {  // NOLINT
   auto expr = expression::compile("${message:escapeHtml4():unescapeHtml4()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "&yen; &amp; &lt; &Pi; &laquo;");
   REQUIRE("&yen; &amp; &lt; &Pi; &laquo;" == expr({ flow_file_a }).asString());
 }
@@ -1152,7 +1147,7 @@
 TEST_CASE("Encode CSV", "[expressionEncodeCSV]") {  // NOLINT
   auto expr = expression::compile("${message:escapeCsv()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "Zero > One < \"two!\" & 'true'");
   REQUIRE("\"Zero > One < \"\"two!\"\" & 'true'\"" == expr({ flow_file_a }).asString());
 }
@@ -1160,7 +1155,7 @@
 TEST_CASE("Decode CSV", "[expressionDecodeCSV]") {  // NOLINT
   auto expr = expression::compile("${message:unescapeCsv()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", R"("Zero > One < ""two!"" & 'true'")");
   REQUIRE("Zero > One < \"two!\" & 'true'" == expr({ flow_file_a }).asString());
 }
@@ -1168,7 +1163,7 @@
 TEST_CASE("Decode CSV 2", "[expressionDecodeCSV2]") {  // NOLINT
   auto expr = expression::compile("${message:unescapeCsv()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", R"("quoted")");
   REQUIRE("\"quoted\"" == expr({ flow_file_a }).asString());
 }
@@ -1176,7 +1171,7 @@
 TEST_CASE("Encode Decode CSV", "[expressionEncodeDecodeCSV]") {  // NOLINT
   auto expr = expression::compile("${message:escapeCsv():unescapeCsv()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "Zero > One < \"two!\" & 'true'");
   REQUIRE("Zero > One < \"two!\" & 'true'" == expr({ flow_file_a }).asString());
 }
@@ -1186,7 +1181,7 @@
 TEST_CASE("Encode URL", "[expressionEncodeURL]") {  // NOLINT
   auto expr = expression::compile("${message:urlEncode()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "some value with spaces");
   REQUIRE("some%20value%20with%20spaces" == expr({ flow_file_a }).asString());
 }
@@ -1194,7 +1189,7 @@
 TEST_CASE("Decode URL", "[expressionDecodeURL]") {  // NOLINT
   auto expr = expression::compile("${message:urlDecode()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "some%20value%20with%20spaces");
   REQUIRE("some value with spaces" == expr({ flow_file_a }).asString());
 }
@@ -1202,7 +1197,7 @@
 TEST_CASE("Encode Decode URL", "[expressionEncodeDecodeURL]") {  // NOLINT
   auto expr = expression::compile("${message:urlEncode():urlDecode()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "some value with spaces");
   REQUIRE("some value with spaces" == expr({ flow_file_a }).asString());
 }
@@ -1210,7 +1205,7 @@
 TEST_CASE("Encode URL", "[expressionEncodeURLExcept]") {  // NOLINT
   auto expr = expression::compile("${message:urlEncode()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "some value with spaces");
   REQUIRE_THROWS(expr({flow_file_a}).asString());
 }
@@ -1218,7 +1213,7 @@
 TEST_CASE("Decode URL", "[expressionDecodeURLExcept]") {  // NOLINT
   auto expr = expression::compile("${message:urlDecode()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "some%20value%20with%20spaces");
   REQUIRE_THROWS(expr({flow_file_a}).asString());
 }
@@ -1226,7 +1221,7 @@
 TEST_CASE("Encode Decode URL", "[expressionEncodeDecodeURLExcept]") {  // NOLINT
   auto expr = expression::compile("${message:urlEncode():urlDecode()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "some value with spaces");
   REQUIRE_THROWS(expr({flow_file_a}).asString());
 }
@@ -1238,7 +1233,7 @@
 TEST_CASE("Parse Date", "[expressionParseDate]") {  // NOLINT
   auto expr = expression::compile("${message:toDate('%Y/%m/%d', 'America/Los_Angeles')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "2014/04/30");
   REQUIRE("1398841200000" == expr({ flow_file_a }).asString());
 }
@@ -1246,7 +1241,7 @@
 TEST_CASE("Format Date", "[expressionFormatDate]") {  // NOLINT
   auto expr = expression::compile("${message:format('%m-%d-%Y', 'GMT')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "1394755200000");
   REQUIRE("03-14-2014" == expr({ flow_file_a }).asString());
 }
@@ -1254,7 +1249,7 @@
 TEST_CASE("Reformat Date", "[expressionReformatDate]") {  // NOLINT
   auto expr = expression::compile("${message:toDate('%Y/%m/%d', 'GMT'):format('%m-%d-%Y', 'America/New_York')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "2014/03/14");
   REQUIRE("03-13-2014" == expr({ flow_file_a }).asString());
 }
@@ -1262,7 +1257,7 @@
 TEST_CASE("Now Date", "[expressionNowDate]") {  // NOLINT
   auto expr = expression::compile("${now():format('%Y')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "2014/03/14");
   time_t t = time(nullptr);
   struct tm lt;
@@ -1276,28 +1271,28 @@
 TEST_CASE("IP", "[expressionIP]") {  // NOLINT
   auto expr = expression::compile("${ip()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
-  REQUIRE("" != expr({ flow_file_a }).asString());
+  auto flow_file_a = std::make_shared<core::FlowFile>();
+  REQUIRE("" != expr( { flow_file_a }).asString());
 }
 
 TEST_CASE("Full Hostname", "[expressionFullHostname]") {  // NOLINT
   auto expr = expression::compile("${hostname('true')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
-  REQUIRE("" != expr({ flow_file_a }).asString());
+  auto flow_file_a = std::make_shared<core::FlowFile>();
+  REQUIRE("" != expr( { flow_file_a }).asString());
 }
 
 TEST_CASE("UUID", "[expressionUuid]") {  // NOLINT
   auto expr = expression::compile("${UUID()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
-  REQUIRE(36 == expr({ flow_file_a }).asString().length());
+  auto flow_file_a = std::make_shared<core::FlowFile>();
+  REQUIRE(36 == expr( { flow_file_a }).asString().length());
 }
 
 TEST_CASE("Trim", "[expressionTrim]") {  // NOLINT
   auto expr = expression::compile("${message:trim()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", " 1 2 3 ");
   REQUIRE("1 2 3" == expr({ flow_file_a }).asString());
 }
@@ -1305,7 +1300,7 @@
 TEST_CASE("Append", "[expressionAppend]") {  // NOLINT
   auto expr = expression::compile("${message:append('.gz')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "a brand new filename.txt");
   REQUIRE("a brand new filename.txt.gz" == expr({ flow_file_a }).asString());
 }
@@ -1313,7 +1308,7 @@
 TEST_CASE("Prepend", "[expressionPrepend]") {  // NOLINT
   auto expr = expression::compile("${message:prepend('a brand new ')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "filename.txt");
   REQUIRE("a brand new filename.txt" == expr({ flow_file_a }).asString());
 }
@@ -1321,7 +1316,7 @@
 TEST_CASE("Length", "[expressionLength]") {  // NOLINT
   auto expr = expression::compile("${message:length()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "a brand new filename.txt");
   REQUIRE(24 == expr({ flow_file_a }).asUnsignedLong());
 }
@@ -1329,7 +1324,7 @@
 TEST_CASE("Encode B64", "[expressionEncodeB64]") {  // NOLINT
   auto expr = expression::compile("${message:base64Encode()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "admin:admin");
   REQUIRE("YWRtaW46YWRtaW4=" == expr({ flow_file_a }).asString());
 }
@@ -1337,7 +1332,7 @@
 TEST_CASE("Decode B64", "[expressionDecodeB64]") {  // NOLINT
   auto expr = expression::compile("${message:base64Decode()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "YWRtaW46YWRtaW4=");
   REQUIRE("admin:admin" == expr({ flow_file_a }).asString());
 }
@@ -1345,7 +1340,7 @@
 TEST_CASE("Encode Decode B64", "[expressionEncodeDecodeB64]") {  // NOLINT
   auto expr = expression::compile("${message:base64Encode():base64Decode()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("message", "Zero > One < \"two!\" & 'true'");
   REQUIRE("Zero > One < \"two!\" & 'true'" == expr({ flow_file_a }).asString());
 }
@@ -1353,7 +1348,7 @@
 TEST_CASE("All Contains", "[expressionAllContains]") {  // NOLINT
   auto expr = expression::compile("${allAttributes('a', 'b'):contains('hello')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("a", "hello 1");
   flow_file_a->addAttribute("b", "hello 2");
   REQUIRE(expr({ flow_file_a }).asBoolean());
@@ -1362,7 +1357,7 @@
 TEST_CASE("All Contains 2", "[expressionAllContains2]") {  // NOLINT
   auto expr = expression::compile("${allAttributes('a', 'b'):contains('hello')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("a", "hello 1");
   flow_file_a->addAttribute("b", "mello 2");
   REQUIRE(!expr({ flow_file_a }).asBoolean());
@@ -1371,7 +1366,7 @@
 TEST_CASE("Any Contains", "[expressionAnyContains]") {  // NOLINT
   auto expr = expression::compile("${anyAttribute('a', 'b'):contains('hello')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("a", "hello 1");
   flow_file_a->addAttribute("b", "mello 2");
   REQUIRE(expr({ flow_file_a }).asBoolean());
@@ -1380,7 +1375,7 @@
 TEST_CASE("Any Contains 2", "[expressionAnyContains2]") {  // NOLINT
   auto expr = expression::compile("${anyAttribute('a', 'b'):contains('hello')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("a", "mello 1");
   flow_file_a->addAttribute("b", "mello 2");
   REQUIRE(!expr({ flow_file_a }).asBoolean());
@@ -1391,7 +1386,7 @@
 TEST_CASE("All Matching Contains", "[expressionAllMatchingContains]") {  // NOLINT
   auto expr = expression::compile("${allMatchingAttributes('xyz_.*'):contains('hello')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("xyz_1", "hello 1");
   flow_file_a->addAttribute("xyz_2", "hello 2");
   REQUIRE(expr({ flow_file_a }).asBoolean());
@@ -1400,7 +1395,7 @@
 TEST_CASE("All Matching Contains 2", "[expressionAllMatchingContains2]") {  // NOLINT
   auto expr = expression::compile("${allMatchingAttributes('abc_.*'):contains('hello')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("xyz_1", "hello 1");
   flow_file_a->addAttribute("xyz_2", "hello 2");
   REQUIRE(!expr({ flow_file_a }).asBoolean());
@@ -1409,7 +1404,7 @@
 TEST_CASE("All Matching Contains 3", "[expressionAllMatchingContains3]") {  // NOLINT
   auto expr = expression::compile("${allMatchingAttributes('abc_.*'):contains('hello')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("xyz_1", "hello 1");
   flow_file_a->addAttribute("abc_2", "hello 2");
   REQUIRE(expr({ flow_file_a }).asBoolean());
@@ -1418,7 +1413,7 @@
 TEST_CASE("All Matching Contains 4", "[expressionAllMatchingContains4]") {  // NOLINT
   auto expr = expression::compile("${allMatchingAttributes('xyz_.*'):contains('hello')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("xyz_1", "hello 1");
   flow_file_a->addAttribute("xyz_2", "2");
   REQUIRE(!expr({ flow_file_a }).asBoolean());
@@ -1427,7 +1422,7 @@
 TEST_CASE("Any Matching Contains", "[expressionAnyMatchingContains]") {  // NOLINT
   auto expr = expression::compile("${anyMatchingAttribute('xyz_.*'):contains('hello')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("xyz_1", "hello 1");
   flow_file_a->addAttribute("xyz_2", "mello 2");
   REQUIRE(expr({ flow_file_a }).asBoolean());
@@ -1436,7 +1431,7 @@
 TEST_CASE("Any Matching Contains 2", "[expressionAnyMatchingContains2]") {  // NOLINT
   auto expr = expression::compile("${anyMatchingAttribute('abc_.*'):contains('hello')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("xyz_1", "hello 1");
   flow_file_a->addAttribute("xyz_2", "mello 2");
   REQUIRE(!expr({ flow_file_a }).asBoolean());
@@ -1445,7 +1440,7 @@
 TEST_CASE("Any Matching Contains 3", "[expressionAnyMatchingContains3]") {  // NOLINT
   auto expr = expression::compile("${anyMatchingAttribute('abc_.*'):contains('hello')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("abc_1", "hello 1");
   flow_file_a->addAttribute("xyz_2", "mello 2");
   REQUIRE(expr({ flow_file_a }).asBoolean());
@@ -1454,7 +1449,7 @@
 TEST_CASE("Any Matching Contains 4", "[expressionAnyMatchingContains4]") {  // NOLINT
   auto expr = expression::compile("${anyMatchingAttribute('abc_.*'):contains('hello')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("xyz_1", "mello 1");
   flow_file_a->addAttribute("xyz_2", "mello 2");
   REQUIRE(!expr({ flow_file_a }).asBoolean());
@@ -1465,7 +1460,7 @@
 TEST_CASE("All Delineated Contains", "[expressionAllDelineatedContains]") {  // NOLINT
   auto expr = expression::compile("${allDelineatedValues(${word_list}, \",\"):contains('hello')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("word_list", "hello_1,hello_2");
   REQUIRE(expr({ flow_file_a }).asBoolean());
 }
@@ -1473,7 +1468,7 @@
 TEST_CASE("All Delineated Contains 2", "[expressionAllDelineatedContains2]") {  // NOLINT
   auto expr = expression::compile("${allDelineatedValues(${word_list}, \",\"):contains('hello')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("word_list", "hello_1,mello_2");
   REQUIRE(!expr({ flow_file_a }).asBoolean());
 }
@@ -1481,7 +1476,7 @@
 TEST_CASE("All Delineated Contains 3", "[expressionAllDelineatedContains3]") {  // NOLINT
   auto expr = expression::compile("${allDelineatedValues(${word_list}, \" \"):contains('1,h')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("word_list", "hello_1,hello_2");
   REQUIRE(expr({ flow_file_a }).asBoolean());
 }
@@ -1489,7 +1484,7 @@
 TEST_CASE("Any Delineated Contains", "[expressionAnyDelineatedContains]") {  // NOLINT
   auto expr = expression::compile("${anyDelineatedValue(${word_list}, \",\"):contains('hello')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("word_list", "hello_1,mello_2");
   REQUIRE(expr({ flow_file_a }).asBoolean());
 }
@@ -1497,7 +1492,7 @@
 TEST_CASE("Any Delineated Contains 2", "[expressionAnyDelineatedContains2]") {  // NOLINT
   auto expr = expression::compile("${anyDelineatedValue(${word_list}, \",\"):contains('hello')}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("word_list", "mello_1,mello_2");
   REQUIRE(!expr({ flow_file_a }).asBoolean());
 }
@@ -1505,7 +1500,7 @@
 TEST_CASE("Count", "[expressionCount]") {  // NOLINT
   auto expr = expression::compile("${allAttributes('a', 'b'):contains('hello'):count()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("a", "hello 1");
   flow_file_a->addAttribute("b", "mello 2");
   REQUIRE(1 == expr({ flow_file_a }).asUnsignedLong());
@@ -1514,7 +1509,7 @@
 TEST_CASE("Count 2", "[expressionCount2]") {  // NOLINT
   auto expr = expression::compile("${allAttributes('a', 'b'):contains('mello'):count()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("a", "mello 1");
   flow_file_a->addAttribute("b", "mello 2");
   flow_file_a->addAttribute("c", "hello 3");
@@ -1524,7 +1519,7 @@
 TEST_CASE("Count 3", "[expressionCount3]") {  // NOLINT
   auto expr = expression::compile("abc${allAttributes('a', 'b'):contains('mello'):count()}xyz");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("a", "mello 1");
   flow_file_a->addAttribute("b", "mello 2");
   flow_file_a->addAttribute("c", "hello 3");
@@ -1534,7 +1529,7 @@
 TEST_CASE("Join", "[expressionJoin]") {  // NOLINT
   auto expr = expression::compile("abc_${allAttributes('a', 'b'):prepend('def_'):append('_ghi'):join(\"|\")}_xyz");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("a", "hello");
   flow_file_a->addAttribute("b", "mello");
   REQUIRE("abc_def_hello_ghi|def_mello_ghi_xyz" == expr({ flow_file_a }).asString());
@@ -1543,7 +1538,7 @@
 TEST_CASE("Join 2", "[expressionJoin2]") {  // NOLINT
   auto expr = expression::compile("abc_${allAttributes('a', 'b'):join(\"|\"):prepend('def_'):append('_ghi')}_xyz");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
   flow_file_a->addAttribute("a", "hello");
   flow_file_a->addAttribute("b", "mello");
   REQUIRE("abc_def_hello|mello_ghi_xyz" == expr({ flow_file_a }).asString());
@@ -1552,7 +1547,7 @@
 TEST_CASE("resolve_user_id_test", "[resolve_user_id tests]") {  // NOLINT
   auto expr = expression::compile("${attribute_sid:resolve_user_id()}");
 
-  auto flow_file_a = std::make_shared<MockFlowFile>();
+  auto flow_file_a = std::make_shared<core::FlowFile>();
 
   SECTION("TEST 0"){
   flow_file_a->addAttribute("attribute_sid", "0");
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 62f63aa..c3042f6 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -54,6 +54,7 @@
   }
   return is_valid_;
 }
+
 void DatabaseContentRepository::stop() {
   if (db_) {
     auto opendb = db_->open();
@@ -64,54 +65,54 @@
   db_.reset();
 }
 
-std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim, bool append) {
+std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const minifi::ResourceClaim &claim, bool append) {
   // the traditional approach with these has been to return -1 from the stream; however, since we have the ability here
   // we can simply return a nullptr, which is also valid from the API when this stream is not valid.
-  if (nullptr == claim || !is_valid_ || !db_)
+  if (!is_valid_ || !db_)
     return nullptr;
   // append is already supported in all modes
-  return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), true);
+  return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), true);
 }
 
-std::shared_ptr<io::BaseStream> DatabaseContentRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+std::shared_ptr<io::BaseStream> DatabaseContentRepository::read(const minifi::ResourceClaim &claim) {
   // the traditional approach with these has been to return -1 from the stream; however, since we have the ability here
   // we can simply return a nullptr, which is also valid from the API when this stream is not valid.
-  if (nullptr == claim || !is_valid_ || !db_)
+  if (!is_valid_ || !db_)
     return nullptr;
-  return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), false);
+  return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), false);
 }
 
-bool DatabaseContentRepository::exists(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
+bool DatabaseContentRepository::exists(const minifi::ResourceClaim &streamId) {
   auto opendb = db_->open();
   if (!opendb) {
     return false;
   }
   std::string value;
   rocksdb::Status status;
-  status = opendb->Get(rocksdb::ReadOptions(), streamId->getContentFullPath(), &value);
+  status = opendb->Get(rocksdb::ReadOptions(), streamId.getContentFullPath(), &value);
   if (status.ok()) {
-    logger_->log_debug("%s exists", streamId->getContentFullPath());
+    logger_->log_debug("%s exists", streamId.getContentFullPath());
     return true;
   } else {
-    logger_->log_debug("%s does not exist", streamId->getContentFullPath());
+    logger_->log_debug("%s does not exist", streamId.getContentFullPath());
     return false;
   }
 }
 
-bool DatabaseContentRepository::remove(const std::shared_ptr<minifi::ResourceClaim> &claim) {
-  if (nullptr == claim || !is_valid_ || !db_)
+bool DatabaseContentRepository::remove(const minifi::ResourceClaim &claim) {
+  if (!is_valid_ || !db_)
     return false;
   auto opendb = db_->open();
   if (!opendb) {
     return false;
   }
   rocksdb::Status status;
-  status = opendb->Delete(rocksdb::WriteOptions(), claim->getContentFullPath());
+  status = opendb->Delete(rocksdb::WriteOptions(), claim.getContentFullPath());
   if (status.ok()) {
-    logger_->log_debug("Deleted %s", claim->getContentFullPath());
+    logger_->log_debug("Deleted %s", claim.getContentFullPath());
     return true;
   } else {
-    logger_->log_debug("Attempted, but could not delete %s", claim->getContentFullPath());
+    logger_->log_debug("Attempted, but could not delete %s", claim.getContentFullPath());
     return false;
   }
 }
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h
index 85e2f7d..6abc476 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -86,17 +86,17 @@
 
   virtual void stop();
 
-  virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<minifi::ResourceClaim> &claim, bool append = false);
+  virtual std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append = false);
 
-  virtual std::shared_ptr<io::BaseStream> read(const std::shared_ptr<minifi::ResourceClaim> &claim);
+  virtual std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim &claim);
 
-  virtual bool close(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+  virtual bool close(const minifi::ResourceClaim &claim) {
     return remove(claim);
   }
 
-  virtual bool remove(const std::shared_ptr<minifi::ResourceClaim> &claim);
+  virtual bool remove(const minifi::ResourceClaim &claim);
 
-  virtual bool exists(const std::shared_ptr<minifi::ResourceClaim> &streamId);
+  virtual bool exists(const minifi::ResourceClaim &streamId);
 
   virtual void yield() {
 
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp
index 57d3ab6..b45647c 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -87,9 +87,7 @@
   if (content_repo_) {
     for (const auto &ffr : purgeList) {
       auto claim = ffr->getResourceClaim();
-      if (claim) {
-        content_repo_->removeIfOrphaned(claim);
-      }
+      if (claim) claim->decreaseFlowFileRecordOwnedCount();
     }
   }
 }
@@ -165,14 +163,15 @@
         found = (search != connectionMap.end());
       }
       if (found) {
-        // we find the connection for the persistent flowfile, create the flowfile and enqueue that
-        std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead);
         eventRead->setStoredToRepository(true);
+        // we found the connection for the persistent flowFile
+        auto claim = eventRead->getResourceClaim();
+        // on behalf of the just resurrected persisted instance
+        if (claim) claim->increaseFlowFileRecordOwnedCount();
+        // even if a processor immediately marks it for deletion, flush only happens after prune_stored_flowfiles
         search->second->put(eventRead);
       } else {
         logger_->log_warn("Could not find connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
-        auto claim = eventRead->getResourceClaim();
-        if (claim) claim->decreaseFlowFileRecordOwnedCount();
         keys_to_delete.enqueue(key);
       }
     } else {
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h
index b3cfc2b..e69fd19 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -182,6 +182,7 @@
     if (running_) {
       return;
     }
+    content_repo_->reset();
     running_ = true;
     thread_ = std::thread(&FlowFileRepository::run, shared_from_this());
     logger_->log_debug("%s Repository Monitor Thread Start", getName());
diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h
index 7557056..7c80aef 100644
--- a/libminifi/include/FlowFileRecord.h
+++ b/libminifi/include/FlowFileRecord.h
@@ -163,11 +163,6 @@
     return claim_ ? claim_->getContentFullPath() : "";
   }
 
-  /**
-   * Cleanly relinquish a resource claim
-   */
-  virtual void releaseClaim(std::shared_ptr<ResourceClaim> claim);
-
   FlowFileRecord &operator=(const FlowFileRecord &);
 
   FlowFileRecord(const FlowFileRecord &parent) = delete;
diff --git a/libminifi/include/ResourceClaim.h b/libminifi/include/ResourceClaim.h
index d773c3d..9a4070d 100644
--- a/libminifi/include/ResourceClaim.h
+++ b/libminifi/include/ResourceClaim.h
@@ -45,8 +45,10 @@
 extern void setDefaultDirectory(std::string);
 
 // ResourceClaim Class
-class ResourceClaim : public std::enable_shared_from_this<ResourceClaim> {
+class ResourceClaim {
  public:
+  // the type which uniquely represents the resource for the owning manager
+  using Path = std::string;
   // Constructor
   /*!
    * Create a new resource claim
@@ -55,42 +57,32 @@
 
   explicit ResourceClaim(std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager);
 
-  explicit ResourceClaim(const std::string path, std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager, bool deleted = false);
+  explicit ResourceClaim(const Path& path, std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager);
 
   // Destructor
-  ~ResourceClaim() = default;
+  ~ResourceClaim();
   // increaseFlowFileRecordOwnedCount
   void increaseFlowFileRecordOwnedCount() {
-    claim_manager_->incrementStreamCount(shared_from_this());
+    claim_manager_->incrementStreamCount(*this);
   }
   // decreaseFlowFileRecordOwenedCount
   void decreaseFlowFileRecordOwnedCount() {
-    claim_manager_->decrementStreamCount(shared_from_this());
+    claim_manager_->decrementStreamCount(*this);
   }
   // getFlowFileRecordOwenedCount
   uint64_t getFlowFileRecordOwnedCount() {
-    return claim_manager_->getStreamCount(shared_from_this());
+    return claim_manager_->getStreamCount(*this);
   }
   // Get the content full path
-  std::string getContentFullPath() {
+  Path getContentFullPath() const {
     return _contentFullPath;
   }
-  // Set the content full path
-  void setContentFullPath(std::string path) {
-    _contentFullPath = path;
-  }
-
-  void deleteClaim() {
-    if (!deleted_) {
-      deleted_ = true;
-    }
-  }
 
   bool exists() {
     if (claim_manager_ == nullptr) {
       return false;
     }
-    return claim_manager_->exists(shared_from_this());
+    return claim_manager_->exists(*this);
   }
 
   friend std::ostream& operator<<(std::ostream& stream, const ResourceClaim& claim) {
@@ -104,9 +96,8 @@
   }
 
  protected:
-  std::atomic<bool> deleted_;
   // Full path to the content
-  std::string _contentFullPath;
+  const Path _contentFullPath;
 
   std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager_;
 
diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h
index 2fca0a3..2b9bbae 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -56,30 +56,14 @@
    */
   virtual void stop() = 0;
 
-  /**
-   * Removes an item if it was orphan
-   */
-  virtual bool removeIfOrphaned(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
+  void reset() {
     std::lock_guard<std::mutex> lock(count_map_mutex_);
-    const std::string str = streamId->getContentFullPath();
-    auto count = count_map_.find(str);
-    if (count != count_map_.end()) {
-      if (count_map_[str] == 0) {
-        remove(streamId);
-        count_map_.erase(str);
-        return true;
-      } else {
-        return false;
-      }
-    } else {
-      remove(streamId);
-      return true;
-    }
+    count_map_.clear();
   }
 
-  virtual uint32_t getStreamCount(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
+  virtual uint32_t getStreamCount(const minifi::ResourceClaim &streamId) {
     std::lock_guard<std::mutex> lock(count_map_mutex_);
-    auto cnt = count_map_.find(streamId->getContentFullPath());
+    auto cnt = count_map_.find(streamId.getContentFullPath());
     if (cnt != count_map_.end()) {
       return cnt->second;
     } else {
@@ -87,9 +71,9 @@
     }
   }
 
-  virtual void incrementStreamCount(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
+  virtual void incrementStreamCount(const minifi::ResourceClaim &streamId) {
     std::lock_guard<std::mutex> lock(count_map_mutex_);
-    const std::string str = streamId->getContentFullPath();
+    const std::string str = streamId.getContentFullPath();
     auto count = count_map_.find(str);
     if (count != count_map_.end()) {
       count_map_[str] = count->second + 1;
@@ -98,14 +82,17 @@
     }
   }
 
-  virtual void decrementStreamCount(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
+  virtual StreamState decrementStreamCount(const minifi::ResourceClaim &streamId) {
     std::lock_guard<std::mutex> lock(count_map_mutex_);
-    const std::string str = streamId->getContentFullPath();
+    const std::string str = streamId.getContentFullPath();
     auto count = count_map_.find(str);
-    if (count != count_map_.end() && count->second > 0) {
+    if (count != count_map_.end() && count->second > 1) {
       count_map_[str] = count->second - 1;
+      return StreamState::Alive;
     } else {
       count_map_.erase(str);
+      remove(streamId);
+      return StreamState::Deleted;
     }
   }
 
diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h
index a546953..34daa65 100644
--- a/libminifi/include/core/FlowFile.h
+++ b/libminifi/include/core/FlowFile.h
@@ -36,64 +36,6 @@
 namespace core {
 
 class FlowFile : public core::Connectable, public ReferenceContainer {
- private:
-  class FlowFileOwnedResourceClaimPtr{
-   public:
-    FlowFileOwnedResourceClaimPtr() = default;
-    explicit FlowFileOwnedResourceClaimPtr(const std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
-      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
-    }
-    explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&& claim) : claim_(std::move(claim)) {
-      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
-    }
-    FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : claim_(ref.claim_) {
-      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
-    }
-    FlowFileOwnedResourceClaimPtr(FlowFileOwnedResourceClaimPtr&& ref) : claim_(std::move(ref.claim_)) {
-      // taking ownership of claim, no need to increment/decrement
-    }
-    FlowFileOwnedResourceClaimPtr& operator=(const FlowFileOwnedResourceClaimPtr& ref) = delete;
-    FlowFileOwnedResourceClaimPtr& operator=(FlowFileOwnedResourceClaimPtr&& ref) = delete;
-
-    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const FlowFileOwnedResourceClaimPtr& ref) {
-      return set(owner, ref.claim_);
-    }
-    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const std::shared_ptr<ResourceClaim>& newClaim) {
-      auto oldClaim = claim_;
-      claim_ = newClaim;
-      // the order of increase/release is important
-      // with refcount manipulation we should always increment first, then decrement as this way we don't accidentally
-      // discard the object under ourselves, note that an equality check will not suffice as two ResourceClaim
-      // instances can reference the same file (they could have the same contentPath)
-      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
-      if (oldClaim) owner.releaseClaim(oldClaim);
-      return *this;
-    }
-    const std::shared_ptr<ResourceClaim>& get() const {
-      return claim_;
-    }
-    const std::shared_ptr<ResourceClaim>& operator->() const {
-      return claim_;
-    }
-    operator bool() const noexcept {
-      return static_cast<bool>(claim_);
-    }
-    ~FlowFileOwnedResourceClaimPtr() {
-      // allow the owner FlowFile to manually release the claim
-      // while logging stuff and removing it from repositories
-      assert(!claim_);
-    }
-
-   private:
-    /*
-     * We are aiming for the constraint that all FlowFiles should have a non-null claim pointer,
-     * unfortunately, for now, some places (e.g. ProcessSession::create) violate this constraint.
-     * We should indicate an empty or invalid content with special claims like
-     * InvalidResourceClaim and EmptyResourceClaim.
-     */
-    std::shared_ptr<ResourceClaim> claim_;
-  };
-
  public:
   FlowFile();
   ~FlowFile() override;
@@ -136,12 +78,6 @@
   bool hasStashClaim(const std::string& key);
 
   /**
-   * Decrease the flow file record owned count for the resource claim and, if 
-   * its counter is at zero, remove it from the repo.
-   */
-  virtual void releaseClaim(const std::shared_ptr<ResourceClaim> claim) = 0;
-
-  /**
    * Get lineage identifiers
    */
   std::set<std::string>& getlineageIdentifiers();
@@ -373,9 +309,9 @@
   // Attributes key/values pairs for the flow record
   std::map<std::string, std::string> attributes_;
   // Pointer to the associated content resource claim
-  FlowFileOwnedResourceClaimPtr claim_;
+  std::shared_ptr<ResourceClaim> claim_;
   // Pointers to stashed content resource claims
-  std::map<std::string, FlowFileOwnedResourceClaimPtr> stashedContent_;
+  std::map<std::string, std::shared_ptr<ResourceClaim>> stashedContent_;
   // UUID string
   // std::string uuid_str_;
   // UUID string for all parents
diff --git a/libminifi/include/core/StreamManager.h b/libminifi/include/core/StreamManager.h
index 57e839f..46a9e20 100644
--- a/libminifi/include/core/StreamManager.h
+++ b/libminifi/include/core/StreamManager.h
@@ -40,6 +40,10 @@
 template<typename T>
 class StreamManager {
  public:
+  enum class StreamState{
+    Deleted,
+    Alive
+  };
   virtual ~StreamManager() = default;
 
   virtual std::string getStoragePath() const = 0;
@@ -49,21 +53,21 @@
    * @param streamId stream identifier
    * @return stream pointer.
    */
-  virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<T> &streamId, bool append = false) = 0;
+  virtual std::shared_ptr<io::BaseStream> write(const T &streamId, bool append = false) = 0;
 
   /**
    * Create a read stream using the streamId as a reference.
    * @param streamId stream identifier
    * @return stream pointer.
    */
-  virtual std::shared_ptr<io::BaseStream> read(const std::shared_ptr<T> &streamId) = 0;
+  virtual std::shared_ptr<io::BaseStream> read(const T &streamId) = 0;
 
   /**
    * Closes the stream
    * @param streamId stream identifier
    * @return result of operation.
    */
-  virtual bool close(const std::shared_ptr<T> &streamId) = 0;
+  virtual bool close(const T &streamId) = 0;
 
   /**
    * Removes the stream from this stream manager. The end result
@@ -71,20 +75,15 @@
    * @param streamId stream identifier
    * @return result of operation.
    */
-  virtual bool remove(const std::shared_ptr<T> &streamId) = 0;
+  virtual bool remove(const T &streamId) = 0;
 
-  /**
-   * Removes an item if it was orphan
-   */
-  virtual bool removeIfOrphaned(const std::shared_ptr<T> &streamId) = 0;
+  virtual uint32_t getStreamCount(const T &streamId) = 0;
 
-  virtual uint32_t getStreamCount(const std::shared_ptr<T> &streamId) = 0;
+  virtual void incrementStreamCount(const T &streamId) = 0;
 
-  virtual void incrementStreamCount(const std::shared_ptr<T> &streamId) = 0;
+  virtual StreamState decrementStreamCount(const T &streamId) = 0;
 
-  virtual void decrementStreamCount(const std::shared_ptr<T> &streamId) = 0;
-
-  virtual bool exists(const std::shared_ptr<T> &streamId) = 0;
+  virtual bool exists(const T &streamId) = 0;
 };
 
 }  // namespace core
diff --git a/libminifi/include/core/repository/FileSystemRepository.h b/libminifi/include/core/repository/FileSystemRepository.h
index 9439ec3..c2b8c75 100644
--- a/libminifi/include/core/repository/FileSystemRepository.h
+++ b/libminifi/include/core/repository/FileSystemRepository.h
@@ -47,17 +47,17 @@
 
   virtual void stop();
 
-  bool exists(const std::shared_ptr<minifi::ResourceClaim> &streamId);
+  bool exists(const minifi::ResourceClaim &streamId);
 
-  virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<minifi::ResourceClaim> &claim, bool append = false);
+  virtual std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append = false);
 
-  virtual std::shared_ptr<io::BaseStream> read(const std::shared_ptr<minifi::ResourceClaim> &claim);
+  virtual std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim &claim);
 
-  virtual bool close(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+  virtual bool close(const minifi::ResourceClaim &claim) {
     return remove(claim);
   }
 
-  virtual bool remove(const std::shared_ptr<minifi::ResourceClaim> &claim);
+  virtual bool remove(const minifi::ResourceClaim &claim);
 
  private:
   std::shared_ptr<logging::Logger> logger_;
diff --git a/libminifi/include/core/repository/VolatileContentRepository.h b/libminifi/include/core/repository/VolatileContentRepository.h
index 037f7b8..03ece73 100644
--- a/libminifi/include/core/repository/VolatileContentRepository.h
+++ b/libminifi/include/core/repository/VolatileContentRepository.h
@@ -41,13 +41,13 @@
  * Purpose: Stages content into a volatile area of memory. Note that   when the maximum number
  * of entries is consumed we will rollback a session to wait for others to be freed.
  */
-class VolatileContentRepository : public core::ContentRepository, public virtual core::repository::VolatileRepository<std::shared_ptr<minifi::ResourceClaim>> {
+class VolatileContentRepository : public core::ContentRepository, public virtual core::repository::VolatileRepository<ResourceClaim::Path> {
  public:
   static const char *minimal_locking;
 
   explicit VolatileContentRepository(std::string name = getClassName<VolatileContentRepository>())
       : core::SerializableComponent(name),
-        core::repository::VolatileRepository<std::shared_ptr<minifi::ResourceClaim>>(name),
+        core::repository::VolatileRepository<ResourceClaim::Path>(name),
         minimize_locking_(true),
         logger_(logging::LoggerFactory<VolatileContentRepository>::getLogger()) {
     max_count_ = 15000;
@@ -79,22 +79,22 @@
    * @param claim resource claim
    * @return BaseStream shared pointer that represents the stream the consumer will write to.
    */
-  virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<minifi::ResourceClaim> &claim, bool append);
+  virtual std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append);
 
   /**
    * Creates readable stream.
    * @param claim resource claim
    * @return BaseStream shared pointer that represents the stream from which the consumer will read..
    */
-  virtual std::shared_ptr<io::BaseStream> read(const std::shared_ptr<minifi::ResourceClaim> &claim);
+  virtual std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim &claim);
 
-  virtual bool exists(const std::shared_ptr<minifi::ResourceClaim> &streamId);
+  virtual bool exists(const minifi::ResourceClaim &streamId);
 
   /**
    * Closes the claim.
    * @return whether or not the claim is associated with content stored in volatile memory.
    */
-  virtual bool close(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+  virtual bool close(const minifi::ResourceClaim &claim) {
     return remove(claim);
   }
 
@@ -102,7 +102,7 @@
    * Closes the claim.
    * @return whether or not the claim is associated with content stored in volatile memory.
    */
-  virtual bool remove(const std::shared_ptr<minifi::ResourceClaim> &claim);
+  virtual bool remove(const minifi::ResourceClaim &claim);
 
  protected:
   virtual void start();
@@ -117,16 +117,11 @@
  private:
   bool minimize_locking_;
 
-  // function pointers that are associated with the claims.
-  std::function<bool(std::shared_ptr<minifi::ResourceClaim>, std::shared_ptr<minifi::ResourceClaim>)> resource_claim_comparator_;
-  std::function<bool(std::shared_ptr<minifi::ResourceClaim>)> resource_claim_check_;
-  std::function<void(std::shared_ptr<minifi::ResourceClaim>)> claim_reclaimer_;
-
   // mutex and master list that represent a cache of Atomic entries. this exists so that we don't have to walk the atomic entry list.
   // The idea is to reduce the computational complexity while keeping access as maximally lock free as we can.
   std::mutex map_mutex_;
 
-  std::map<std::string, AtomicEntry<std::shared_ptr<minifi::ResourceClaim>>*> master_list_;
+  std::map<ResourceClaim::Path, AtomicEntry<ResourceClaim::Path>*> master_list_;
 
   // logger
   std::shared_ptr<logging::Logger> logger_;
diff --git a/libminifi/include/core/repository/VolatileFlowFileRepository.h b/libminifi/include/core/repository/VolatileFlowFileRepository.h
index c276d28..1c65f0e 100644
--- a/libminifi/include/core/repository/VolatileFlowFileRepository.h
+++ b/libminifi/include/core/repository/VolatileFlowFileRepository.h
@@ -50,20 +50,23 @@
     repo_full_ = false;
     while (running_) {
       std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
-      if (purge_required_ && nullptr != content_repo_) {
-        std::lock_guard<std::mutex> lock(purge_mutex_);
-        for (auto purgeItem : purge_list_) {
-          std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
-          if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(purgeItem.data()), purgeItem.size())) {
-            std::shared_ptr<minifi::ResourceClaim> newClaim = eventRead->getResourceClaim();
-            if (newClaim != nullptr) {
-              content_repo_->removeIfOrphaned(newClaim);
-            }
-          }
+      flush();
+    }
+    flush();
+  }
+
+  virtual void flush() {
+    if (purge_required_ && nullptr != content_repo_) {
+      std::lock_guard<std::mutex> lock(purge_mutex_);
+      for (auto purgeItem : purge_list_) {
+        std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
+        if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(purgeItem.data()), purgeItem.size())) {
+          auto claim = eventRead->getResourceClaim();
+          if (claim) claim->decreaseFlowFileRecordOwnedCount();
         }
-        purge_list_.resize(0);
-        purge_list_.clear();
       }
+      purge_list_.resize(0);
+      purge_list_.clear();
     }
   }
 
diff --git a/libminifi/include/utils/Id.h b/libminifi/include/utils/Id.h
index b38d17b..116e29a 100644
--- a/libminifi/include/utils/Id.h
+++ b/libminifi/include/utils/Id.h
@@ -173,7 +173,6 @@
 
 class NonRepeatingStringGenerator {
  public:
-  NonRepeatingStringGenerator();
   std::string generate() {
     return prefix_ + std::to_string(incrementor_++);
   }
@@ -181,8 +180,8 @@
       return incrementor_++;
     }
  private:
-  std::atomic<uint64_t> incrementor_;
-  std::string prefix_;
+  std::atomic<uint64_t> incrementor_{0};
+  std::string prefix_{std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count()) + "-"};
 };
 
 }  // namespace utils
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 358b589..882686c 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -46,7 +46,7 @@
       content_repo_(content_repo),
       flow_repository_(flow_repository) {
   id_ = local_flow_seq_number_.load();
-  claim_.set(*this, claim);
+  claim_ = claim;
   // Increase the local ID for the flow record
   ++local_flow_seq_number_;
   // Populate the default attributes
@@ -77,7 +77,7 @@
   offset_ = event->getOffset();
   event->getUUID(uuid_);
   uuid_connection_ = uuidConnection;
-  claim_.set(*this, event->getResourceClaim());
+  claim_ = event->getResourceClaim();
   if (event->getFlowIdentifier()) {
     std::string attr;
     event->getAttribute(FlowAttributeKey(FlowAttribute::FLOW_ID), attr);
@@ -94,7 +94,7 @@
       snapshot_(""),
       content_repo_(content_repo),
       flow_repository_(flow_repository) {
-  claim_.set(*this, event->getResourceClaim());
+  claim_ = event->getResourceClaim();
   if (event->getFlowIdentifier()) {
     std::string attr;
     event->getAttribute(FlowAttributeKey(FlowAttribute::FLOW_ID), attr);
@@ -115,23 +115,6 @@
   if (!claim_) {
     logger_->log_debug("Claim is null ptr for %s", uuidStr_);
   }
-
-  claim_.set(*this, nullptr);
-
-  // Disown stash claims
-  for (auto &stashPair : stashedContent_) {
-    auto& stashClaim = stashPair.second;
-    stashClaim.set(*this, nullptr);
-  }
-}
-
-void FlowFileRecord::releaseClaim(std::shared_ptr<ResourceClaim> claim) {
-  // Decrease the flow file record owned count for the resource claim
-  claim->decreaseFlowFileRecordOwnedCount();
-  logger_->log_debug("Detaching Resource Claim %s, %s, attempt " "%" PRIu64, getUUIDStr(), claim->getContentFullPath(), claim->getFlowFileRecordOwnedCount());
-  if (content_repo_ && content_repo_->removeIfOrphaned(claim)) {
-    logger_->log_debug("Deleted Resource Claim %s", claim->getContentFullPath());
-  }
 }
 
 bool FlowFileRecord::addKeyedAttribute(FlowAttribute key, const std::string &value) {
@@ -358,7 +341,7 @@
     return false;
   }
 
-  claim_.set(*this, std::make_shared<ResourceClaim>(content_full_path, content_repo_, true));
+  claim_ = std::make_shared<ResourceClaim>(content_full_path, content_repo_);
   return true;
 }
 
diff --git a/libminifi/src/ResourceClaim.cpp b/libminifi/src/ResourceClaim.cpp
index 1e08e20..92e39b2 100644
--- a/libminifi/src/ResourceClaim.cpp
+++ b/libminifi/src/ResourceClaim.cpp
@@ -41,22 +41,28 @@
 }
 
 ResourceClaim::ResourceClaim(std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager)
-    : claim_manager_(claim_manager),
-      deleted_(false),
-      logger_(logging::LoggerFactory<ResourceClaim>::getLogger()) {
-  auto contentDirectory = claim_manager_->getStoragePath();
-  if (contentDirectory.empty())
-    contentDirectory = default_directory_path;
+    : _contentFullPath([&] {
+        auto contentDirectory = claim_manager->getStoragePath();
+        if (contentDirectory.empty())
+          contentDirectory = default_directory_path;
 
-  // Create the full content path for the content
-  _contentFullPath = contentDirectory + "/" + non_repeating_string_generator_.generate();
+        // Create the full content path for the content
+        return contentDirectory + "/" + non_repeating_string_generator_.generate();
+      }()),
+      claim_manager_(std::move(claim_manager)),
+      logger_(logging::LoggerFactory<ResourceClaim>::getLogger()) {
+  if (claim_manager_) increaseFlowFileRecordOwnedCount();
   logger_->log_debug("Resource Claim created %s", _contentFullPath);
 }
 
-ResourceClaim::ResourceClaim(const std::string path, std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager, bool deleted)
-    : claim_manager_(claim_manager),
-      deleted_(deleted) {
-  _contentFullPath = path;
+ResourceClaim::ResourceClaim(const Path& path, std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager)
+    : _contentFullPath(path),
+      claim_manager_(std::move(claim_manager)) {
+  if (claim_manager_) increaseFlowFileRecordOwnedCount();
+}
+
+ResourceClaim::~ResourceClaim() {
+  if (claim_manager_) decreaseFlowFileRecordOwnedCount();
 }
 
 } /* namespace minifi */
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index f5765f8..095c601 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -64,7 +64,7 @@
   size_ = other.size_;
   penaltyExpiration_ms_ = other.penaltyExpiration_ms_;
   attributes_ = other.attributes_;
-  claim_.set(*this, other.claim_);
+  claim_ = other.claim_;
   uuidStr_ = other.uuidStr_;
   connection_ = other.connection_;
   original_connection_ = other.original_connection_;
@@ -93,18 +93,18 @@
 }
 
 std::shared_ptr<ResourceClaim> FlowFile::getResourceClaim() {
-  return claim_.get();
+  return claim_;
 }
 
 void FlowFile::clearResourceClaim() {
-  claim_.set(*this, nullptr);
+  claim_ = nullptr;
 }
 void FlowFile::setResourceClaim(const std::shared_ptr<ResourceClaim>& claim) {
-  claim_.set(*this, claim);
+  claim_ = claim;
 }
 
 std::shared_ptr<ResourceClaim> FlowFile::getStashClaim(const std::string& key) {
-  return stashedContent_[key].get();
+  return stashedContent_[key];
 }
 
 void FlowFile::setStashClaim(const std::string& key, const std::shared_ptr<ResourceClaim>& claim) {
@@ -114,13 +114,13 @@
                       getUUIDStr().c_str(), key.c_str());
   }
 
-  stashedContent_[key].set(*this, claim);
+  stashedContent_[key] = claim;
 }
 
 void FlowFile::clearStashClaim(const std::string& key) {
   auto claimIt = stashedContent_.find(key);
   if (claimIt != stashedContent_.end()) {
-    claimIt->second.set(*this, nullptr);
+    claimIt->second = nullptr;
     stashedContent_.erase(claimIt);
   }
 }
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index c6de75a..eb977ca 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -245,7 +245,7 @@
 
   try {
     uint64_t startTime = utils::timeutils::getTimeMillis();
-    std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
+    std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(*claim);
     // Call the callback to write the content
     if (nullptr == stream) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for write");
@@ -280,7 +280,7 @@
 
   try {
     uint64_t startTime = utils::timeutils::getTimeMillis();
-    std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim, true);
+    std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(*claim, true);
     if (nullptr == stream) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for append");
     }
@@ -323,7 +323,7 @@
 
     claim = flow->getResourceClaim();
 
-    std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->read(claim);
+    std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->read(*claim);
 
     if (nullptr == stream) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for read");
@@ -356,7 +356,7 @@
 
   try {
     auto startTime = utils::timeutils::getTimeMillis();
-    std::shared_ptr<io::BaseStream> content_stream = process_context_->getContentRepository()->write(claim);
+    std::shared_ptr<io::BaseStream> content_stream = process_context_->getContentRepository()->write(*claim);
 
     if (nullptr == content_stream) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Could not obtain claim for " + claim->getContentFullPath());
@@ -402,7 +402,7 @@
     auto startTime = utils::timeutils::getTimeMillis();
     std::ifstream input;
     input.open(source.c_str(), std::fstream::in | std::fstream::binary);
-    std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
+    std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(*claim);
     if (nullptr == stream) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open new flowfile content for write");
     }
@@ -519,7 +519,7 @@
           claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
         }
         if (stream == nullptr) {
-          stream = process_context_->getContentRepository()->write(claim);
+          stream = process_context_->getContentRepository()->write(*claim);
         }
         if (stream == nullptr) {
           logger_->log_error("Stream is null");
@@ -939,9 +939,7 @@
         if (ff->isStored() && flowFileRepo->Delete(ff->getUUIDStr())) {
           // original must be non-null since this flowFile is already stored in the repos ->
           // must have come from a session->get()
-          auto claim = original->getResourceClaim();
-          // decrement on behalf of the persisted-instance-to-be-deleted
-          if (claim) claim->decreaseFlowFileRecordOwnedCount();
+          assert(original);
           ff->setStoredToRepository(false);
         }
         continue;
@@ -951,7 +949,7 @@
       if (claim) claim->increaseFlowFileRecordOwnedCount();
       auto originalClaim = original ? original->getResourceClaim() : nullptr;
       // decrement on behalf of the overridden instance if any
-      if (ff->isStored() && originalClaim) originalClaim->decreaseFlowFileRecordOwnedCount();
+      if (originalClaim) originalClaim->decreaseFlowFileRecordOwnedCount();
 
       ff->setStoredToRepository(true);
     }
diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp
index 4f9b83d..ea89cdc 100644
--- a/libminifi/src/core/repository/FileSystemRepository.cpp
+++ b/libminifi/src/core/repository/FileSystemRepository.cpp
@@ -42,21 +42,22 @@
 void FileSystemRepository::stop() {
 }
 
-std::shared_ptr<io::BaseStream> FileSystemRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim, bool append) {
-  return std::make_shared<io::FileStream>(claim->getContentFullPath(), append);
+std::shared_ptr<io::BaseStream> FileSystemRepository::write(const minifi::ResourceClaim &claim, bool append) {
+  return std::make_shared<io::FileStream>(claim.getContentFullPath(), append);
 }
 
-bool FileSystemRepository::exists(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
-  std::ifstream file(streamId->getContentFullPath());
+bool FileSystemRepository::exists(const minifi::ResourceClaim &streamId) {
+  std::ifstream file(streamId.getContentFullPath());
   return file.good();
 }
 
-std::shared_ptr<io::BaseStream> FileSystemRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) {
-  return std::make_shared<io::FileStream>(claim->getContentFullPath(), 0, false);
+std::shared_ptr<io::BaseStream> FileSystemRepository::read(const minifi::ResourceClaim &claim) {
+  return std::make_shared<io::FileStream>(claim.getContentFullPath(), 0, false);
 }
 
-bool FileSystemRepository::remove(const std::shared_ptr<minifi::ResourceClaim> &claim) {
-  std::remove(claim->getContentFullPath().c_str());
+bool FileSystemRepository::remove(const minifi::ResourceClaim &claim) {
+  logger_->log_debug("Deleting resource %s", claim.getContentFullPath());
+  std::remove(claim.getContentFullPath().c_str());
   return true;
 }
 
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp
index d626354..d7039cf 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -38,17 +38,6 @@
 
 bool VolatileContentRepository::initialize(const std::shared_ptr<Configure> &configure) {
   VolatileRepository::initialize(configure);
-  resource_claim_comparator_ = [](std::shared_ptr<minifi::ResourceClaim> lhsPtr, std::shared_ptr<minifi::ResourceClaim> rhsPtr) {
-    if (lhsPtr == nullptr || rhsPtr == nullptr) {
-      return false;
-    }
-    return lhsPtr->getContentFullPath() == rhsPtr->getContentFullPath();};
-  resource_claim_check_ = [](std::shared_ptr<minifi::ResourceClaim> claim) {
-    return claim->getFlowFileRecordOwnedCount() <= 0;};
-  claim_reclaimer_ = [&](std::shared_ptr<minifi::ResourceClaim> claim) {if (claim->getFlowFileRecordOwnedCount() <= 0) {
-      remove(claim);
-    }
-  };
 
   if (configure != nullptr) {
     bool minimize_locking = false;
@@ -89,52 +78,52 @@
   logger_->log_info("%s Repository Monitor Thread Start", getName());
 }
 
-std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim, bool append) {
-  logger_->log_info("enter write for %s", claim->getContentFullPath());
+std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const minifi::ResourceClaim &claim, bool append) {
+  logger_->log_info("enter write for %s", claim.getContentFullPath());
   {
     std::lock_guard<std::mutex> lock(map_mutex_);
-    auto claim_check = master_list_.find(claim->getContentFullPath());
+    auto claim_check = master_list_.find(claim.getContentFullPath());
     if (claim_check != master_list_.end()) {
       logger_->log_info("Creating copy of atomic entry");
       auto ent = claim_check->second->takeOwnership();
       if (ent == nullptr) {
         return nullptr;
       }
-      return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent);
+      return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), ent);
     }
   }
 
   int size = 0;
   if (LIKELY(minimize_locking_ == true)) {
     for (auto ent : value_vector_) {
-      if (ent->testAndSetKey(claim, nullptr, nullptr, resource_claim_comparator_)) {
+      if (ent->testAndSetKey(claim.getContentFullPath())) {
         std::lock_guard<std::mutex> lock(map_mutex_);
-        master_list_[claim->getContentFullPath()] = ent;
-        logger_->log_info("Minimize locking, return stream for %s", claim->getContentFullPath());
-        return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent);
+        master_list_[claim.getContentFullPath()] = ent;
+        logger_->log_info("Minimize locking, return stream for %s", claim.getContentFullPath());
+        return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), ent);
       }
       size++;
     }
   } else {
     std::lock_guard<std::mutex> lock(map_mutex_);
-    auto claim_check = master_list_.find(claim->getContentFullPath());
+    auto claim_check = master_list_.find(claim.getContentFullPath());
     if (claim_check != master_list_.end()) {
-      return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, claim_check->second);
+      return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), claim_check->second);
     } else {
-      AtomicEntry<std::shared_ptr<minifi::ResourceClaim>> *ent = new AtomicEntry<std::shared_ptr<minifi::ResourceClaim>>(&current_size_, &max_size_);
-      if (ent->testAndSetKey(claim, nullptr, nullptr, resource_claim_comparator_)) {
-        master_list_[claim->getContentFullPath()] = ent;
-        return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent);
+      auto *ent = new AtomicEntry<ResourceClaim::Path>(&current_size_, &max_size_);
+      if (ent->testAndSetKey(claim.getContentFullPath())) {
+        master_list_[claim.getContentFullPath()] = ent;
+        return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), ent);
       }
     }
   }
-  logger_->log_info("Cannot write %s %d, returning nullptr to roll back session. Repo is either full or locked", claim->getContentFullPath(), size);
+  logger_->log_info("Cannot write %s %d, returning nullptr to roll back session. Repo is either full or locked", claim.getContentFullPath(), size);
   return nullptr;
 }
 
-bool VolatileContentRepository::exists(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+bool VolatileContentRepository::exists(const minifi::ResourceClaim &claim) {
   std::lock_guard<std::mutex> lock(map_mutex_);
-  auto claim_check = master_list_.find(claim->getContentFullPath());
+  auto claim_check = master_list_.find(claim.getContentFullPath());
   if (claim_check != master_list_.end()) {
     auto ent = claim_check->second->takeOwnership();
     if (ent == nullptr) {
@@ -146,53 +135,53 @@
   return false;
 }
 
-std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const minifi::ResourceClaim &claim) {
   std::lock_guard<std::mutex> lock(map_mutex_);
-  auto claim_check = master_list_.find(claim->getContentFullPath());
+  auto claim_check = master_list_.find(claim.getContentFullPath());
   if (claim_check != master_list_.end()) {
     auto ent = claim_check->second->takeOwnership();
     if (ent == nullptr) {
       return nullptr;
     }
-    return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent);
+    return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), ent);
   }
 
   return nullptr;
 }
 
-bool VolatileContentRepository::remove(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+bool VolatileContentRepository::remove(const minifi::ResourceClaim &claim) {
   if (LIKELY(minimize_locking_ == true)) {
     std::lock_guard<std::mutex> lock(map_mutex_);
-    auto ent = master_list_.find(claim->getContentFullPath());
+    auto ent = master_list_.find(claim.getContentFullPath());
     if (ent != master_list_.end()) {
       auto ptr = ent->second;
       // if we cannot remove the entry we will let the owner's destructor
       // decrement the reference count and free it
-      master_list_.erase(claim->getContentFullPath());
+      master_list_.erase(claim.getContentFullPath());
       // because of the test and set we need to decrement ownership
       ptr->decrementOwnership();
-      if (ptr->freeValue(claim)) {
-        logger_->log_info("Removed %s", claim->getContentFullPath());
+      if (ptr->freeValue(claim.getContentFullPath())) {
+        logger_->log_info("Deleting resource %s", claim.getContentFullPath());
         return true;
       } else {
-        logger_->log_info("free failed for %s", claim->getContentFullPath());
+        logger_->log_info("free failed for %s", claim.getContentFullPath());
       }
     } else {
-      logger_->log_info("Could not remove %s", claim->getContentFullPath());
+      logger_->log_info("Could not remove %s", claim.getContentFullPath());
     }
   } else {
     std::lock_guard<std::mutex> lock(map_mutex_);
-    auto claim_item = master_list_.find(claim->getContentFullPath());
+    auto claim_item = master_list_.find(claim.getContentFullPath());
     if (claim_item != master_list_.end()) {
       auto size = claim_item->second->getLength();
       delete claim_item->second;
-      master_list_.erase(claim->getContentFullPath());
+      master_list_.erase(claim.getContentFullPath());
       current_size_ -= size;
     }
     return true;
   }
 
-  logger_->log_info("Could not remove %s, may not exist", claim->getContentFullPath());
+  logger_->log_info("Could not remove %s, may not exist", claim.getContentFullPath());
   return false;
 }
 
diff --git a/libminifi/src/utils/Id.cpp b/libminifi/src/utils/Id.cpp
index 79c8f7d..c90a39d 100644
--- a/libminifi/src/utils/Id.cpp
+++ b/libminifi/src/utils/Id.cpp
@@ -169,13 +169,6 @@
   converted_ = uuidStr;
 }
 
-uint64_t timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
-
-NonRepeatingStringGenerator::NonRepeatingStringGenerator()
-    : prefix_((std::to_string(timestamp) + "-")),
-      incrementor_(0) {
-}
-
 IdGenerator::IdGenerator()
     : implementation_(UUID_TIME_IMPL),
       logger_(logging::LoggerFactory<IdGenerator>::getLogger()),
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp
index 4844729..1ba607b 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -33,12 +33,13 @@
 #include "../TestBase.h"
 #include "../../extensions/libarchive/MergeContent.h"
 #include "../test/BufferReader.h"
+#include "core/repository/VolatileFlowFileRepository.h"
 
 using Connection = minifi::Connection;
 using MergeContent = minifi::processors::MergeContent;
 
 struct TestFlow{
-  TestFlow(const std::shared_ptr<core::repository::FlowFileRepository>& ff_repository, const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<core::Repository>& prov_repo,
+  TestFlow(const std::shared_ptr<core::Repository>& ff_repository, const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<core::Repository>& prov_repo,
         const std::function<std::shared_ptr<core::Processor>(utils::Identifier&)>& processorGenerator, const core::Relationship& relationshipToOutput)
       : ff_repository(ff_repository), content_repo(content_repo), prov_repo(prov_repo) {
     // setup processor
@@ -119,7 +120,7 @@
 
   std::shared_ptr<core::Processor> inputProcessor;
   std::shared_ptr<core::Processor> processor;
-  std::shared_ptr<core::repository::FlowFileRepository> ff_repository;
+  std::shared_ptr<core::Repository> ff_repository;
   std::shared_ptr<core::ContentRepository> content_repo;
   std::shared_ptr<core::Repository> prov_repo;
   std::shared_ptr<core::ProcessContext> inputContext;
@@ -244,8 +245,11 @@
   TestController testController;
   LogTestController::getInstance().setDebug<core::ContentRepository>();
   LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+  LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
   LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
   LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
+  LogTestController::getInstance().setTrace<core::repository::FlowFileRepository>();
+  LogTestController::getInstance().setTrace<core::repository::VolatileRepository<minifi::ResourceClaim::Path>>();
 
   char format[] = "/var/tmp/test.XXXXXX";
   auto dir = testController.createTempDirectory(format);
@@ -255,8 +259,16 @@
   config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
 
   std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestRepository>();
-  std::shared_ptr<core::repository::FlowFileRepository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
+  std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
+  std::shared_ptr<core::ContentRepository> content_repo;
+  SECTION("VolatileContentRepository") {
+    testController.getLogger()->log_info("Using VolatileContentRepository");
+    content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+  }
+  SECTION("FileSystemContentRepository") {
+    testController.getLogger()->log_info("Using FileSystemRepository");
+    content_repo = std::make_shared<core::repository::FileSystemRepository>();
+  }
   ff_repository->initialize(config);
   content_repo->initialize(config);
 
@@ -269,23 +281,29 @@
     flowController->load(flow.root);
     ff_repository->start();
 
-    // write two files into the input
-    auto flowFile = flow.write("data");
-    auto claim = flowFile->getResourceClaim();
-    // one from the FlowFile and one from the persisted instance
-    REQUIRE(claim->getFlowFileRecordOwnedCount() == 2);
-    // update them with the Merge Processor
-    flow.trigger();
+    std::string removedResource;
+    {
+      // write two files into the input
+      auto flowFile = flow.write("data");
+      auto claim = flowFile->getResourceClaim();
+      removedResource = claim->getContentFullPath();
+      // one from the FlowFile and one from the persisted instance
+      REQUIRE(claim->getFlowFileRecordOwnedCount() == 2);
+      // update them with the Merge Processor
+      flow.trigger();
 
-    auto content = flow.read(flowFile);
-    REQUIRE(content == "<override>");
-    auto newClaim = flowFile->getResourceClaim();
-    // the processor added new content to the flowFile
-    REQUIRE(claim != newClaim);
-    // nobody holds an owning reference to the previous claim
-    REQUIRE(claim->getFlowFileRecordOwnedCount() == 0);
-    // one from the FlowFile and one from the persisted instance
-    REQUIRE(newClaim->getFlowFileRecordOwnedCount() == 2);
+      auto content = flow.read(flowFile);
+      REQUIRE(content == "<override>");
+      auto newClaim = flowFile->getResourceClaim();
+      // the processor added new content to the flowFile
+      REQUIRE(claim != newClaim);
+      // only this instance behind this shared_ptr keeps the resource alive
+      REQUIRE(claim->getFlowFileRecordOwnedCount() == 1);
+      // one from the FlowFile and one from the persisted instance
+      REQUIRE(newClaim->getFlowFileRecordOwnedCount() == 2);
+    }
+    REQUIRE(LogTestController::getInstance().countOccurrences("Deleting resource " + removedResource) == 1);
+    REQUIRE(LogTestController::getInstance().countOccurrences("Deleting resource") == 1);
 
     ff_repository->stop();
     flowController->unload();
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index 89d9bca..a6717f7 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -39,7 +39,7 @@
 
 
   auto claim = std::make_shared<minifi::ResourceClaim>(content_repo);
-  auto stream = content_repo->write(claim);
+  auto stream = content_repo->write(*claim);
 
   stream->writeUTF("well hello there");
 
@@ -55,7 +55,7 @@
   configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
   REQUIRE(content_repo->initialize(configuration));
 
-  auto read_stream = content_repo->read(claim);
+  auto read_stream = content_repo->read(*claim);
 
   std::string readstr;
   read_stream->readUTF(readstr);
@@ -80,7 +80,7 @@
 
 
   auto claim = std::make_shared<minifi::ResourceClaim>(content_repo);
-  auto stream = content_repo->write(claim);
+  auto stream = content_repo->write(*claim);
 
   stream->writeUTF("well hello there");
 
@@ -97,9 +97,9 @@
   configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
   REQUIRE(content_repo->initialize(configuration));
 
-  content_repo->remove(claim);
+  content_repo->remove(*claim);
 
-  auto read_stream = content_repo->read(claim);
+  auto read_stream = content_repo->read(*claim);
 
   std::string readstr;
 
@@ -118,7 +118,7 @@
   REQUIRE(content_repo->initialize(configuration));
 
   auto claim = std::make_shared<minifi::ResourceClaim>(content_repo);
-  auto stream = content_repo->write(claim);
+  auto stream = content_repo->write(*claim);
 
   // we're writing nothing to the stream.
 
@@ -135,7 +135,7 @@
   configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
   REQUIRE(content_repo->initialize(configuration));
 
-  auto read_stream = content_repo->read(claim);
+  auto read_stream = content_repo->read(*claim);
 
   std::string readstr;
 
@@ -143,68 +143,7 @@
   REQUIRE(read_stream->readUTF(readstr) == -1);
 }
 
-TEST_CASE("Test Null Claim", "[TestDBCR4]") {
-  TestController testController;
-  char format[] = "/var/tmp/testRepo.XXXXXX";
-  auto dir = testController.createTempDirectory(format);
-  auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
-
-  auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
-  configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
-  REQUIRE(content_repo->initialize(configuration));
-
-
-  auto claim = std::make_shared<minifi::ResourceClaim>(content_repo);
-  auto stream = content_repo->write(nullptr);
-
-  REQUIRE(stream == nullptr);
-
-  auto read_stream = content_repo->write(nullptr);
-
-  REQUIRE(read_stream == nullptr);
-}
-
-TEST_CASE("Delete Null Claim", "[TestDBCR5]") {
-  TestController testController;
-  char format[] = "/var/tmp/testRepo.XXXXXX";
-  auto dir = testController.createTempDirectory(format);
-  auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
-
-  auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
-  configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
-  REQUIRE(content_repo->initialize(configuration));
-
-  auto claim = std::make_shared<minifi::ResourceClaim>(content_repo);
-  auto stream = content_repo->write(claim);
-
-  stream->writeUTF("well hello there");
-
-  stream->closeStream();
-
-  content_repo->stop();
-
-  // reclaim the memory
-  content_repo = nullptr;
-
-  content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
-
-  configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
-  configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
-  REQUIRE(content_repo->initialize(configuration));
-
-  REQUIRE(!content_repo->remove(nullptr));
-
-  auto read_stream = content_repo->read(claim);
-
-  std::string readstr;
-
-  // -1 tell us we have an invalid stream
-  read_stream->readUTF(readstr);
-
-  REQUIRE(readstr == "well hello there");
-}
-
-TEST_CASE("Delete NonExistent Claim", "[TestDBCR5]") {
+TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") {
   TestController testController;
   char format[] = "/var/tmp/testRepo.XXXXXX";
   auto dir = testController.createTempDirectory(format);
@@ -216,7 +155,7 @@
 
   auto claim = std::make_shared<minifi::ResourceClaim>(content_repo);
   auto claim2 = std::make_shared<minifi::ResourceClaim>(content_repo);
-  auto stream = content_repo->write(claim);
+  auto stream = content_repo->write(*claim);
 
   stream->writeUTF("well hello there");
 
@@ -234,9 +173,9 @@
   REQUIRE(content_repo->initialize(configuration));
 
   // we won't complain if it does not exist
-  REQUIRE(content_repo->remove(claim2));
+  REQUIRE(content_repo->remove(*claim2));
 
-  auto read_stream = content_repo->read(claim);
+  auto read_stream = content_repo->read(*claim);
 
   std::string readstr;
 
@@ -246,7 +185,7 @@
   REQUIRE(readstr == "well hello there");
 }
 
-TEST_CASE("Delete Remove Count Claim", "[TestDBCR6]") {
+TEST_CASE("Delete Remove Count Claim", "[TestDBCR5]") {
   TestController testController;
   char format[] = "/var/tmp/testRepo.XXXXXX";
   auto dir = testController.createTempDirectory(format);
@@ -258,7 +197,7 @@
 
   auto claim = std::make_shared<minifi::ResourceClaim>(content_repo);
   auto claim2 = std::make_shared<minifi::ResourceClaim>(content_repo);
-  auto stream = content_repo->write(claim);
+  auto stream = content_repo->write(*claim);
 
   stream->writeUTF("well hello there");
 
@@ -278,14 +217,13 @@
   // increment twice. verify we have 2 for the stream count
   // and then test the removal and verify that the claim was removed by virtue of obtaining
   // its count.
-  content_repo->incrementStreamCount(claim2);
-  content_repo->incrementStreamCount(claim2);
-  REQUIRE(content_repo->getStreamCount(claim2) == 2);
-  content_repo->decrementStreamCount(claim2);
-  content_repo->decrementStreamCount(claim2);
-  REQUIRE(content_repo->removeIfOrphaned(claim2));
-  REQUIRE(content_repo->getStreamCount(claim2) == 0);
-  auto read_stream = content_repo->read(claim);
+  content_repo->incrementStreamCount(*claim2);
+  content_repo->incrementStreamCount(*claim2);
+  REQUIRE(content_repo->getStreamCount(*claim2) == 2);
+  content_repo->decrementStreamCount(*claim2);
+  REQUIRE(content_repo->decrementStreamCount(*claim2) == core::StreamManager<minifi::ResourceClaim>::StreamState::Deleted);
+  REQUIRE(content_repo->getStreamCount(*claim2) == 0);
+  auto read_stream = content_repo->read(*claim);
 
   std::string readstr;
 
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp
index aa2ad84..fefea64 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -162,9 +162,10 @@
 
   repository->loadComponent(content_repo);
 
-  std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ss.str(), content_repo);
 
   {
+    std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ss.str(), content_repo);
+
     minifi::FlowFileRecord record(repository, content_repo, attributes, claim);
 
     record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd");
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index 2fc2e7e..db65427 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -343,7 +343,7 @@
     auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
     if((*content_repo_ptr) && (ff->keepContent == 0)) {
       auto claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo_ptr);
-      (*content_repo_ptr)->remove(claim);
+      (*content_repo_ptr)->remove(*claim);
     }
     delete content_repo_ptr;
   }
@@ -451,7 +451,7 @@
   if(ff->crp && (*content_repo)) {
     std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation,
                                                                                            *content_repo);
-    auto stream = (*content_repo)->read(claim);
+    auto stream = (*content_repo)->read(*claim);
     return stream->read(target, size);
   } else {
     file_buffer fb = file_to_buffer(ff->contentLocation);
@@ -699,7 +699,7 @@
     if(input_ff->crp && (*content_repo)) {
       std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(input_ff->contentLocation,
                                                                                              *content_repo);
-      ff_data->content_stream = (*content_repo)->read(claim);
+      ff_data->content_stream = (*content_repo)->read(*claim);
     } else {
       ff_data->content_stream = std::make_shared<minifi::io::DataStream>();
       file_buffer fb = file_to_buffer(input_ff->contentLocation);