MINIFICPP-1309 - RAII based resource ownership
MINIFICPP-1309 - Use path as resource id
Signed-off-by: Arpad Boda <aboda@apache.org>
This closes #857
diff --git a/extensions/expression-language/tests/ExpressionLanguageTests.cpp b/extensions/expression-language/tests/ExpressionLanguageTests.cpp
index d61a003..3a08ad5 100644
--- a/extensions/expression-language/tests/ExpressionLanguageTests.cpp
+++ b/extensions/expression-language/tests/ExpressionLanguageTests.cpp
@@ -38,11 +38,6 @@
namespace expression = org::apache::nifi::minifi::expression;
-class MockFlowFile : public core::FlowFile {
- void releaseClaim(const std::shared_ptr<minifi::ResourceClaim> claim) override {
- }
-};
-
TEST_CASE("Trivial static expression", "[expressionLanguageTestTrivialStaticExpr]") { // NOLINT
REQUIRE("a" == expression::make_static("a")({ }).asString());
}
@@ -58,7 +53,7 @@
}
TEST_CASE("Attribute expression", "[expressionLanguageTestAttributeExpression]") { // NOLINT
- auto flow_file = std::make_shared<MockFlowFile>();
+ auto flow_file = std::make_shared<core::FlowFile>();
flow_file->addAttribute("attr_a", "__attr_value_a__");
auto expr = expression::compile("text_before${attr_a}text_after");
REQUIRE("text_before__attr_value_a__text_after" == expr({ flow_file }).asString());
@@ -66,12 +61,12 @@
TEST_CASE("Attribute expression (Null)", "[expressionLanguageTestAttributeExpressionNull]") { // NOLINT
auto expr = expression::compile("text_before${attr_a}text_after");
- std::shared_ptr<MockFlowFile> flow_file = nullptr;
- REQUIRE("text_beforetext_after" == expr({ flow_file }).asString());
+ std::shared_ptr<core::FlowFile> flow_file = nullptr;
+ REQUIRE("text_beforetext_after" == expr( { flow_file }).asString());
}
TEST_CASE("Multi-attribute expression", "[expressionLanguageTestMultiAttributeExpression]") { // NOLINT
- auto flow_file = std::make_shared<MockFlowFile>();
+ auto flow_file = std::make_shared<core::FlowFile>();
flow_file->addAttribute("attr_a", "__attr_value_a__");
flow_file->addAttribute("attr_b", "__attr_value_b__");
auto expr = expression::compile("text_before${attr_a}text_between${attr_b}text_after");
@@ -82,17 +77,17 @@
"[expressionLanguageTestMultiFlowfileAttributeExpression]") { // NOLINT
auto expr = expression::compile("text_before${attr_a}text_after");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr_a", "__flow_a_attr_value_a__");
REQUIRE("text_before__flow_a_attr_value_a__text_after" == expr({ flow_file_a }).asString());
- auto flow_file_b = std::make_shared<MockFlowFile>();
+ auto flow_file_b = std::make_shared<core::FlowFile>();
flow_file_b->addAttribute("attr_a", "__flow_b_attr_value_a__");
REQUIRE("text_before__flow_b_attr_value_a__text_after" == expr({ flow_file_b }).asString());
}
TEST_CASE("Attribute expression with whitespace", "[expressionLanguageTestAttributeExpressionWhitespace]") { // NOLINT
- auto flow_file = std::make_shared<MockFlowFile>();
+ auto flow_file = std::make_shared<core::FlowFile>();
flow_file->addAttribute("attr_a", "__attr_value_a__");
auto expr = expression::compile("text_before${\n\tattr_a \r}text_after");
REQUIRE("text_before__attr_value_a__text_after" == expr({ flow_file }).asString());
@@ -101,7 +96,7 @@
TEST_CASE("Special characters expression", "[expressionLanguageTestSpecialCharactersExpression]") { // NOLINT
auto expr = expression::compile("text_before|{}()[],:;\\/*#'\" \t\r\n${attr_a}}()text_after");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr_a", "__flow_a_attr_value_a__");
REQUIRE("text_before|{}()[],:;\\/*#'\" \t\r\n__flow_a_attr_value_a__}()text_after" == expr({ flow_file_a }).asString());
}
@@ -109,7 +104,7 @@
TEST_CASE("UTF-8 characters expression", "[expressionLanguageTestUTF8Expression]") { // NOLINT
auto expr = expression::compile("text_before¥£€¢₡₢₣₤₥₦₧₨₩₪₫₭₮₯₹${attr_a}text_after");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr_a", "__flow_a_attr_value_a__");
REQUIRE("text_before¥£€¢₡₢₣₤₥₦₧₨₩₪₫₭₮₯₹__flow_a_attr_value_a__text_after" == expr({ flow_file_a }).asString());
}
@@ -117,7 +112,7 @@
TEST_CASE("UTF-8 characters attribute", "[expressionLanguageTestUTF8Attribute]") { // NOLINT
auto expr = expression::compile("text_before${attr_a}text_after");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr_a", "__¥£€¢₡₢₣₤₥₦₧₨₩₪₫₭₮₯₹__");
REQUIRE("text_before__¥£€¢₡₢₣₤₥₦₧₨₩₪₫₭₮₯₹__text_after" == expr({ flow_file_a }).asString());
}
@@ -125,7 +120,7 @@
TEST_CASE("Single quoted attribute expression", "[expressionLanguageTestSingleQuotedAttributeExpression]") { // NOLINT
auto expr = expression::compile("text_before${'|{}()[],:;\\\\/*# \t\r\n$'}text_after");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("|{}()[],:;\\/*# \t\r\n$", "__flow_a_attr_value_a__");
REQUIRE("text_before__flow_a_attr_value_a__text_after" == expr({ flow_file_a }).asString());
}
@@ -133,7 +128,7 @@
TEST_CASE("Double quoted attribute expression", "[expressionLanguageTestDoubleQuotedAttributeExpression]") { // NOLINT
auto expr = expression::compile("text_before${\"|{}()[],:;\\\\/*# \t\r\n$\"}text_after");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("|{}()[],:;\\/*# \t\r\n$", "__flow_a_attr_value_a__");
REQUIRE("text_before__flow_a_attr_value_a__text_after" == expr({ flow_file_a }).asString());
}
@@ -148,22 +143,22 @@
expected.append(hostname);
expected.append("text_after");
- auto flow_file_a = std::make_shared<MockFlowFile>();
- REQUIRE(expected == expr({ flow_file_a }).asString());
+ auto flow_file_a = std::make_shared<core::FlowFile>();
+ REQUIRE(expected == expr( { flow_file_a }).asString());
}
TEST_CASE("ToUpper function", "[expressionLanguageTestToUpperFunction]") { // NOLINT
auto expr = expression::compile(R"(text_before${
attr_a : toUpper()
}text_after)");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr_a", "__flow_a_attr_value_a__");
REQUIRE("text_before__FLOW_A_ATTR_VALUE_A__text_after" == expr({ flow_file_a }).asString());
}
TEST_CASE("ToUpper function w/o whitespace", "[expressionLanguageTestToUpperFunctionWithoutWhitespace]") { // NOLINT
auto expr = expression::compile(R"(text_before${attr_a:toUpper()}text_after)");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr_a", "__flow_a_attr_value_a__");
REQUIRE("text_before__FLOW_A_ATTR_VALUE_A__text_after" == expr({ flow_file_a }).asString());
}
@@ -172,7 +167,7 @@
auto expr = expression::compile(R"(text_before${
attr_a : toLower()
}text_after)");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr_a", "__FLOW_A_ATTR_VALUE_A__");
REQUIRE("text_before__flow_a_attr_value_a__text_after" == expr({ flow_file_a }).asString());
}
@@ -250,7 +245,7 @@
TEST_CASE("Substring 2 arg", "[expressionLanguageSubstring2]") { // NOLINT
auto expr = expression::compile("text_before${attr:substring(6, 8)}text_after");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "__flow_a_attr_value_a__");
REQUIRE("text_before_a_attr_text_after" == expr({ flow_file_a }).asString());
}
@@ -258,7 +253,7 @@
TEST_CASE("Substring 1 arg", "[expressionLanguageSubstring1]") { // NOLINT
auto expr = expression::compile("text_before${attr:substring(6)}text_after");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "__flow_a_attr_value_a__");
REQUIRE("text_before_a_attr_value_a__text_after" == expr({ flow_file_a }).asString());
}
@@ -266,7 +261,7 @@
TEST_CASE("Substring Before", "[expressionLanguageSubstringBefore]") { // NOLINT
auto expr = expression::compile("${attr:substringBefore('attr_value_a__')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "__flow_a_attr_value_a__");
REQUIRE("__flow_a_" == expr({ flow_file_a }).asString());
}
@@ -274,7 +269,7 @@
TEST_CASE("Substring Before Last", "[expressionLanguageSubstringBeforeLast]") { // NOLINT
auto expr = expression::compile("${attr:substringBeforeLast('_a')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "__flow_a_attr_value_a__");
REQUIRE("__flow_a_attr_value" == expr({ flow_file_a }).asString());
}
@@ -282,7 +277,7 @@
TEST_CASE("Substring After", "[expressionLanguageSubstringAfter]") { // NOLINT
auto expr = expression::compile("${attr:substringAfter('__flow_a')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "__flow_a_attr_value_a__");
REQUIRE("_attr_value_a__" == expr({ flow_file_a }).asString());
}
@@ -290,7 +285,7 @@
TEST_CASE("Substring After Last", "[expressionLanguageSubstringAfterLast]") { // NOLINT
auto expr = expression::compile("${attr:substringAfterLast('_a')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "__flow_a_attr_value_a__");
REQUIRE("__" == expr({ flow_file_a }).asString());
}
@@ -298,7 +293,7 @@
TEST_CASE("Get Delimited", "[expressionLanguageGetDelimited]") { // NOLINT
auto expr = expression::compile("${attr:getDelimitedField(2)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "\"Jacobson, John\", 32, Mr.");
REQUIRE(" 32" == expr({ flow_file_a }).asString());
}
@@ -306,7 +301,7 @@
TEST_CASE("Get Delimited 2", "[expressionLanguageGetDelimited2]") { // NOLINT
auto expr = expression::compile("${attr:getDelimitedField(1)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "\"Jacobson, John\", 32, Mr.");
REQUIRE("\"Jacobson, John\"" == expr({ flow_file_a }).asString());
}
@@ -314,7 +309,7 @@
TEST_CASE("Get Delimited 3", "[expressionLanguageGetDelimited3]") { // NOLINT
auto expr = expression::compile("${attr:getDelimitedField(1, ',', '\\\"', '\\\\', 'true')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "\"Jacobson, John\", 32, Mr.");
REQUIRE("Jacobson, John" == expr({ flow_file_a }).asString());
}
@@ -322,7 +317,7 @@
TEST_CASE("Starts With", "[expressionLanguageStartsWith]") { // NOLINT
auto expr = expression::compile("${attr:startsWith('a brand')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "A BRAND TEST");
REQUIRE("false" == expr({ flow_file_a }).asString());
}
@@ -330,7 +325,7 @@
TEST_CASE("Starts With 2", "[expressionLanguageStartsWith2]") { // NOLINT
auto expr = expression::compile("${attr:startsWith('a brand')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand TEST");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -338,7 +333,7 @@
TEST_CASE("Ends With", "[expressionLanguageEndsWith]") { // NOLINT
auto expr = expression::compile("${attr:endsWith('txt')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.TXT");
REQUIRE("false" == expr({ flow_file_a }).asString());
}
@@ -346,7 +341,7 @@
TEST_CASE("Ends With 2", "[expressionLanguageEndsWith2]") { // NOLINT
auto expr = expression::compile("${attr:endsWith('TXT')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.TXT");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -354,7 +349,7 @@
TEST_CASE("Contains", "[expressionLanguageContains]") { // NOLINT
auto expr = expression::compile("${attr:contains('new')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -362,7 +357,7 @@
TEST_CASE("Contains 2", "[expressionLanguageContains2]") { // NOLINT
auto expr = expression::compile("${attr:contains('NEW')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("false" == expr({ flow_file_a }).asString());
}
@@ -370,7 +365,7 @@
TEST_CASE("In", "[expressionLanguageIn]") { // NOLINT
auto expr = expression::compile("${attr:in('PAUL', 'JOHN', 'MIKE')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "JOHN");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -378,7 +373,7 @@
TEST_CASE("In 2", "[expressionLanguageIn2]") { // NOLINT
auto expr = expression::compile("${attr:in('RED', 'GREEN', 'BLUE')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "JOHN");
REQUIRE("false" == expr({ flow_file_a }).asString());
}
@@ -399,7 +394,7 @@
TEST_CASE("Replace", "[expressionLanguageReplace]") { // NOLINT
auto expr = expression::compile("${attr:replace('.', '_')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("a brand new filename_txt" == expr({ flow_file_a }).asString());
}
@@ -407,7 +402,7 @@
TEST_CASE("Replace 2", "[expressionLanguageReplace2]") { // NOLINT
auto expr = expression::compile("${attr:replace(' ', '.')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("a.brand.new.filename.txt" == expr({ flow_file_a }).asString());
}
@@ -415,7 +410,7 @@
TEST_CASE("Replace First", "[expressionLanguageReplaceFirst]") { // NOLINT
auto expr = expression::compile("${attr:replaceFirst('a', 'the')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("the brand new filename.txt" == expr({ flow_file_a }).asString());
}
@@ -423,7 +418,7 @@
TEST_CASE("Replace First Regex", "[expressionLanguageReplaceFirstRegex]") { // NOLINT
auto expr = expression::compile("${attr:replaceFirst('[br]', 'g')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("a grand new filename.txt" == expr({ flow_file_a }).asString());
}
@@ -431,7 +426,7 @@
TEST_CASE("Replace All", "[expressionLanguageReplaceAll]") { // NOLINT
auto expr = expression::compile("${attr:replaceAll('\\\\..*', '')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("a brand new filename" == expr({ flow_file_a }).asString());
}
@@ -439,7 +434,7 @@
TEST_CASE("Replace All 2", "[expressionLanguageReplaceAll2]") { // NOLINT
auto expr = expression::compile("${attr:replaceAll('a brand (new)', '$1')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("new filename.txt" == expr({ flow_file_a }).asString());
}
@@ -447,7 +442,7 @@
TEST_CASE("Replace All 3", "[expressionLanguageReplaceAll3]") { // NOLINT
auto expr = expression::compile("${attr:replaceAll('XYZ', 'ZZZ')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("a brand new filename.txt" == expr({ flow_file_a }).asString());
}
@@ -455,7 +450,7 @@
TEST_CASE("Replace Null", "[expressionLanguageReplaceNull]") { // NOLINT
auto expr = expression::compile("${attr:replaceNull('abc')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("a brand new filename.txt" == expr({ flow_file_a }).asString());
}
@@ -463,7 +458,7 @@
TEST_CASE("Replace Null 2", "[expressionLanguageReplaceNull2]") { // NOLINT
auto expr = expression::compile("${attr:replaceNull('abc')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr2", "a brand new filename.txt");
REQUIRE("abc" == expr({ flow_file_a }).asString());
}
@@ -471,7 +466,7 @@
TEST_CASE("Replace Empty", "[expressionLanguageReplaceEmpty]") { // NOLINT
auto expr = expression::compile("${attr:replaceEmpty('abc')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("a brand new filename.txt" == expr({ flow_file_a }).asString());
}
@@ -479,7 +474,7 @@
TEST_CASE("Replace Empty 2", "[expressionLanguageReplaceEmpty2]") { // NOLINT
auto expr = expression::compile("${attr:replaceEmpty('abc')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", " \t \r \n ");
REQUIRE("abc" == expr({ flow_file_a }).asString());
}
@@ -487,7 +482,7 @@
TEST_CASE("Replace Empty 3", "[expressionLanguageReplaceEmpty2]") { // NOLINT
auto expr = expression::compile("${attr:replaceEmpty('abc')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr2", "test");
REQUIRE("abc" == expr({ flow_file_a }).asString());
}
@@ -495,7 +490,7 @@
TEST_CASE("Matches", "[expressionLanguageMatches]") { // NOLINT
auto expr = expression::compile("${attr:matches('^(Ct|Bt|At):.*t$')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "At:est");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -503,7 +498,7 @@
TEST_CASE("Matches 2", "[expressionLanguageMatches2]") { // NOLINT
auto expr = expression::compile("${attr:matches('^(Ct|Bt|At):.*t$')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "At:something");
REQUIRE("false" == expr({ flow_file_a }).asString());
}
@@ -511,7 +506,7 @@
TEST_CASE("Matches 3", "[expressionLanguageMatches3]") { // NOLINT
auto expr = expression::compile("${attr:matches('(Ct|Bt|At):.*t')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", " At:est");
REQUIRE("false" == expr({ flow_file_a }).asString());
}
@@ -519,7 +514,7 @@
TEST_CASE("Find", "[expressionLanguageFind]") { // NOLINT
auto expr = expression::compile("${attr:find('a [Bb]rand [Nn]ew')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -527,7 +522,7 @@
TEST_CASE("Find 2", "[expressionLanguageFind2]") { // NOLINT
auto expr = expression::compile("${attr:find('Brand.*')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("false" == expr({ flow_file_a }).asString());
}
@@ -535,7 +530,7 @@
TEST_CASE("Find 3", "[expressionLanguageFind3]") { // NOLINT
auto expr = expression::compile("${attr:find('brand')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -543,7 +538,7 @@
TEST_CASE("IndexOf", "[expressionLanguageIndexOf]") { // NOLINT
auto expr = expression::compile("${attr:indexOf('a.*txt')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("-1" == expr({ flow_file_a }).asString());
}
@@ -551,7 +546,7 @@
TEST_CASE("IndexOf2", "[expressionLanguageIndexOf2]") { // NOLINT
auto expr = expression::compile("${attr:indexOf('.')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("20" == expr({ flow_file_a }).asString());
}
@@ -559,7 +554,7 @@
TEST_CASE("IndexOf3", "[expressionLanguageIndexOf3]") { // NOLINT
auto expr = expression::compile("${attr:indexOf('a')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("0" == expr({ flow_file_a }).asString());
}
@@ -567,7 +562,7 @@
TEST_CASE("IndexOf4", "[expressionLanguageIndexOf4]") { // NOLINT
auto expr = expression::compile("${attr:indexOf(' ')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("1" == expr({ flow_file_a }).asString());
}
@@ -575,7 +570,7 @@
TEST_CASE("LastIndexOf", "[expressionLanguageLastIndexOf]") { // NOLINT
auto expr = expression::compile("${attr:lastIndexOf('a.*txt')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("-1" == expr({ flow_file_a }).asString());
}
@@ -583,7 +578,7 @@
TEST_CASE("LastIndexOf2", "[expressionLanguageLastIndexOf2]") { // NOLINT
auto expr = expression::compile("${attr:lastIndexOf('.')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("20" == expr({ flow_file_a }).asString());
}
@@ -591,7 +586,7 @@
TEST_CASE("LastIndexOf3", "[expressionLanguageLastIndexOf3]") { // NOLINT
auto expr = expression::compile("${attr:lastIndexOf('a')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("17" == expr({ flow_file_a }).asString());
}
@@ -599,7 +594,7 @@
TEST_CASE("LastIndexOf4", "[expressionLanguageLastIndexOf4]") { // NOLINT
auto expr = expression::compile("${attr:lastIndexOf(' ')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "a brand new filename.txt");
REQUIRE("11" == expr({ flow_file_a }).asString());
}
@@ -609,7 +604,7 @@
TEST_CASE("Plus Integer", "[expressionLanguagePlusInteger]") { // NOLINT
auto expr = expression::compile("${attr:plus(13)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "11");
REQUIRE("24" == expr({ flow_file_a }).asString());
}
@@ -617,7 +612,7 @@
TEST_CASE("Plus Decimal", "[expressionLanguagePlusDecimal]") { // NOLINT
auto expr = expression::compile("${attr:plus(-13.34567)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "11.1");
REQUIRE("-2.24567" == expr({ flow_file_a }).asString());
}
@@ -625,7 +620,7 @@
TEST_CASE("Plus Exponent", "[expressionLanguagePlusExponent]") { // NOLINT
auto expr = expression::compile("${attr:plus(10e+6)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "11");
REQUIRE("10000011" == expr({ flow_file_a }).asString());
}
@@ -633,7 +628,7 @@
TEST_CASE("Plus Exponent 2", "[expressionLanguagePlusExponent2]") { // NOLINT
auto expr = expression::compile("${attr:plus(10e+6)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "11.345678901234");
REQUIRE("10000011.345678901234351" == expr({ flow_file_a }).asString());
}
@@ -641,7 +636,7 @@
TEST_CASE("Minus Integer", "[expressionLanguageMinusInteger]") { // NOLINT
auto expr = expression::compile("${attr:minus(13)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "11");
REQUIRE("-2" == expr({ flow_file_a }).asString());
}
@@ -649,7 +644,7 @@
TEST_CASE("Minus Decimal", "[expressionLanguageMinusDecimal]") { // NOLINT
auto expr = expression::compile("${attr:minus(-13.34567)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "11.1");
REQUIRE("24.44567" == expr({ flow_file_a }).asString());
}
@@ -657,7 +652,7 @@
TEST_CASE("Multiply Integer", "[expressionLanguageMultiplyInteger]") { // NOLINT
auto expr = expression::compile("${attr:multiply(13)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "11");
REQUIRE("143" == expr({ flow_file_a }).asString());
}
@@ -665,7 +660,7 @@
TEST_CASE("Multiply Decimal", "[expressionLanguageMultiplyDecimal]") { // NOLINT
auto expr = expression::compile("${attr:multiply(-13.34567)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "11.1");
REQUIRE("-148.136937" == expr({ flow_file_a }).asString());
}
@@ -673,7 +668,7 @@
TEST_CASE("Divide Integer", "[expressionLanguageDivideInteger]") { // NOLINT
auto expr = expression::compile("${attr:divide(13)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "11");
REQUIRE("0.846153846153846" == expr({ flow_file_a }).asString());
}
@@ -681,7 +676,7 @@
TEST_CASE("Divide Decimal", "[expressionLanguageDivideDecimal]") { // NOLINT
auto expr = expression::compile("${attr:divide(-13.34567)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "11.1");
REQUIRE("-0.831730441409086" == expr({ flow_file_a }).asString());
}
@@ -689,7 +684,7 @@
TEST_CASE("To Radix", "[expressionLanguageToRadix]") { // NOLINT
auto expr = expression::compile("${attr:toRadix(2,16)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "10");
REQUIRE("0000000000001010" == expr({ flow_file_a }).asString());
}
@@ -697,7 +692,7 @@
TEST_CASE("To Radix 2", "[expressionLanguageToRadix2]") { // NOLINT
auto expr = expression::compile("${attr:toRadix(16)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "13");
REQUIRE("d" == expr({ flow_file_a }).asString());
}
@@ -705,7 +700,7 @@
TEST_CASE("To Radix 3", "[expressionLanguageToRadix3]") { // NOLINT
auto expr = expression::compile("${attr:toRadix(23,8)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "-2347");
REQUIRE("-000004a1" == expr({ flow_file_a }).asString());
}
@@ -713,7 +708,7 @@
TEST_CASE("From Radix", "[expressionLanguageFromRadix]") { // NOLINT
auto expr = expression::compile("${attr:fromRadix(2)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "0000000000001010");
REQUIRE("10" == expr({ flow_file_a }).asString());
}
@@ -721,7 +716,7 @@
TEST_CASE("From Radix 2", "[expressionLanguageFromRadix2]") { // NOLINT
auto expr = expression::compile("${attr:fromRadix(16)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "d");
REQUIRE("13" == expr({ flow_file_a }).asString());
}
@@ -729,7 +724,7 @@
TEST_CASE("From Radix 3", "[expressionLanguageFromRadix3]") { // NOLINT
auto expr = expression::compile("${attr:fromRadix(23)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "-000004a1");
REQUIRE("-2347" == expr({ flow_file_a }).asString());
}
@@ -737,15 +732,15 @@
TEST_CASE("Random", "[expressionLanguageRandom]") { // NOLINT
auto expr = expression::compile("${random()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
- auto result = expr({ flow_file_a }).asSignedLong();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
+ auto result = expr( { flow_file_a }).asSignedLong();
REQUIRE(result > 0);
}
TEST_CASE("Chained call", "[expressionChainedCall]") { // NOLINT
auto expr = expression::compile("${attr:multiply(3):plus(1)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "7");
REQUIRE("22" == expr({ flow_file_a }).asString());
}
@@ -753,14 +748,14 @@
TEST_CASE("Chained call 2", "[expressionChainedCall2]") { // NOLINT
auto expr = expression::compile("${literal(10):multiply(2):plus(1):multiply(2)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
- REQUIRE(42 == expr({ flow_file_a }).asSignedLong());
+ auto flow_file_a = std::make_shared<core::FlowFile>();
+ REQUIRE(42 == expr( { flow_file_a }).asSignedLong());
}
TEST_CASE("Chained call 3", "[expressionChainedCall3]") { // NOLINT
auto expr = expression::compile("${literal(10):multiply(2):plus(${attr:multiply(2)}):multiply(${attr})}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "7");
REQUIRE("238" == expr({ flow_file_a }).asString());
}
@@ -768,7 +763,7 @@
TEST_CASE("LiteralBool", "[expressionLiteralBool]") { // NOLINT
auto expr = expression::compile("${literal(true)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "7");
REQUIRE(true == expr({ flow_file_a }).asBoolean());
}
@@ -776,7 +771,7 @@
TEST_CASE("LiteralBool 2", "[expressionLiteralBool2]") { // NOLINT
auto expr = expression::compile("${literal(false)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "7");
REQUIRE(false == expr({ flow_file_a }).asBoolean());
}
@@ -784,7 +779,7 @@
TEST_CASE("Is Null", "[expressionIsNull]") { // NOLINT
auto expr = expression::compile("${filename:isNull()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "7");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -792,7 +787,7 @@
TEST_CASE("Is Null 2", "[expressionIsNull2]") { // NOLINT
auto expr = expression::compile("${filename:isNull()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("filename", "7");
REQUIRE("false" == expr({ flow_file_a }).asString());
}
@@ -800,7 +795,7 @@
TEST_CASE("Not Null", "[expressionNotNull]") { // NOLINT
auto expr = expression::compile("${filename:notNull()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "7");
REQUIRE("false" == expr({ flow_file_a }).asString());
}
@@ -808,7 +803,7 @@
TEST_CASE("Not Null 2", "[expressionNotNull2]") { // NOLINT
auto expr = expression::compile("${filename:notNull()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("filename", "7");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -816,7 +811,7 @@
TEST_CASE("Is Empty", "[expressionIsEmpty]") { // NOLINT
auto expr = expression::compile("${filename:isEmpty()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "7");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -824,7 +819,7 @@
TEST_CASE("Is Empty 2", "[expressionIsEmpty2]") { // NOLINT
auto expr = expression::compile("${attr:isEmpty()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "7");
REQUIRE("false" == expr({ flow_file_a }).asString());
}
@@ -832,7 +827,7 @@
TEST_CASE("Is Empty 3", "[expressionIsEmpty3]") { // NOLINT
auto expr = expression::compile("${attr:isEmpty()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", " \t\r\n ");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -840,7 +835,7 @@
TEST_CASE("Is Empty 4", "[expressionIsEmpty4]") { // NOLINT
auto expr = expression::compile("${attr:isEmpty()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -848,7 +843,7 @@
TEST_CASE("Is Empty 5", "[expressionIsEmpty5]") { // NOLINT
auto expr = expression::compile("${attr:isEmpty()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", " \t\r\n a \t\r\n ");
REQUIRE("false" == expr({ flow_file_a }).asString());
}
@@ -856,7 +851,7 @@
TEST_CASE("Equals", "[expressionEquals]") { // NOLINT
auto expr = expression::compile("${attr:equals('hello.txt')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "hello.txt");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -864,7 +859,7 @@
TEST_CASE("Equals 2", "[expressionEquals2]") { // NOLINT
auto expr = expression::compile("${attr:equals('hello.txt')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "helllo.txt");
REQUIRE("false" == expr({ flow_file_a }).asString());
}
@@ -872,7 +867,7 @@
TEST_CASE("Equals 3", "[expressionEquals3]") { // NOLINT
auto expr = expression::compile("${attr:plus(5):equals(6)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "1");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -880,7 +875,7 @@
TEST_CASE("Equals Ignore Case", "[expressionEqualsIgnoreCase]") { // NOLINT
auto expr = expression::compile("${attr:equalsIgnoreCase('hElLo.txt')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "hello.txt");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -888,7 +883,7 @@
TEST_CASE("Equals Ignore Case 2", "[expressionEqualsIgnoreCase2]") { // NOLINT
auto expr = expression::compile("${attr:plus(5):equalsIgnoreCase(6)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "1");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -896,7 +891,7 @@
TEST_CASE("GT", "[expressionGT]") { // NOLINT
auto expr = expression::compile("${attr:plus(5):gt(5)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "1");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -904,7 +899,7 @@
TEST_CASE("GT2", "[expressionGT2]") { // NOLINT
auto expr = expression::compile("${attr:plus(5.1):gt(6.05)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "1");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -912,7 +907,7 @@
TEST_CASE("GT3", "[expressionGT3]") { // NOLINT
auto expr = expression::compile("${attr:plus(5.1):gt(6.15)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "1");
REQUIRE("false" == expr({ flow_file_a }).asString());
}
@@ -920,7 +915,7 @@
TEST_CASE("GE", "[expressionGE]") { // NOLINT
auto expr = expression::compile("${attr:plus(5):ge(6)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "1");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -928,7 +923,7 @@
TEST_CASE("GE2", "[expressionGE2]") { // NOLINT
auto expr = expression::compile("${attr:plus(5.1):ge(6.05)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "1");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -936,7 +931,7 @@
TEST_CASE("GE3", "[expressionGE3]") { // NOLINT
auto expr = expression::compile("${attr:plus(5.1):ge(6.15)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "1");
REQUIRE("false" == expr({ flow_file_a }).asString());
}
@@ -944,7 +939,7 @@
TEST_CASE("LT", "[expressionLT]") { // NOLINT
auto expr = expression::compile("${attr:plus(5):lt(5)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "1");
REQUIRE("false" == expr({ flow_file_a }).asString());
}
@@ -952,7 +947,7 @@
TEST_CASE("LT2", "[expressionLT2]") { // NOLINT
auto expr = expression::compile("${attr:plus(5.1):lt(6.05)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "1");
REQUIRE("false" == expr({ flow_file_a }).asString());
}
@@ -960,7 +955,7 @@
TEST_CASE("LT3", "[expressionLT3]") { // NOLINT
auto expr = expression::compile("${attr:plus(5.1):lt(6.15)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "1");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -968,7 +963,7 @@
TEST_CASE("LE", "[expressionLE]") { // NOLINT
auto expr = expression::compile("${attr:plus(5):le(6)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "1");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -976,7 +971,7 @@
TEST_CASE("LE2", "[expressionLE2]") { // NOLINT
auto expr = expression::compile("${attr:plus(5.1):le(6.05)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "1");
REQUIRE("false" == expr({ flow_file_a }).asString());
}
@@ -984,7 +979,7 @@
TEST_CASE("LE3", "[expressionLE3]") { // NOLINT
auto expr = expression::compile("${attr:plus(5.1):le(6.15)}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("attr", "1");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -992,7 +987,7 @@
TEST_CASE("And", "[expressionAnd]") { // NOLINT
auto expr = expression::compile("${filename:toLower():equals( ${filename} ):and(${filename:substring(0, 2):equals('an')})}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("filename", "an example file.txt");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -1000,7 +995,7 @@
TEST_CASE("And 2", "[expressionAnd2]") { // NOLINT
auto expr = expression::compile("${filename:toLower():equals( ${filename} ):and(${filename:substring(0, 2):equals('ab')})}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("filename", "an example file.txt");
REQUIRE("false" == expr({ flow_file_a }).asString());
}
@@ -1008,7 +1003,7 @@
TEST_CASE("Or", "[expressionOr]") { // NOLINT
auto expr = expression::compile("${filename:toLower():equals( ${filename} ):or(${filename:substring(0, 2):equals('an')})}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("filename", "an example file.txt");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -1016,7 +1011,7 @@
TEST_CASE("Or 2", "[expressionOr2]") { // NOLINT
auto expr = expression::compile("${filename:toLower():equals( ${filename} ):or(${filename:substring(0, 2):equals('ab')})}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("filename", "an example file.txt");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -1024,7 +1019,7 @@
TEST_CASE("Not", "[expressionNot]") { // NOLINT
auto expr = expression::compile("${filename:toLower():equals( ${filename} ):and(${filename:substring(0, 2):equals('an')}):not()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("filename", "an example file.txt");
REQUIRE("false" == expr({ flow_file_a }).asString());
}
@@ -1032,7 +1027,7 @@
TEST_CASE("Not 2", "[expressionNot2]") { // NOLINT
auto expr = expression::compile("${filename:toLower():equals( ${filename} ):and(${filename:substring(0, 2):equals('ab')}):not()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("filename", "an example file.txt");
REQUIRE("true" == expr({ flow_file_a }).asString());
}
@@ -1040,7 +1035,7 @@
TEST_CASE("If Else", "[expressionIfElse]") { // NOLINT
auto expr = expression::compile("${filename:toLower():equals( ${filename}):ifElse('yes', 'no')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("filename", "an example file.txt");
REQUIRE("yes" == expr({ flow_file_a }).asString());
}
@@ -1048,7 +1043,7 @@
TEST_CASE("If Else 2", "[expressionIfElse2]") { // NOLINT
auto expr = expression::compile("${filename:toLower():equals( ${filename}):ifElse('yes', 'no')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("filename", "An example file.txt");
REQUIRE("no" == expr({ flow_file_a }).asString());
}
@@ -1056,7 +1051,7 @@
TEST_CASE("Encode JSON", "[expressionEncodeJSON]") { // NOLINT
auto expr = expression::compile("${message:escapeJson()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "This is a \"test!\"");
REQUIRE("This is a \\\"test!\\\"" == expr({ flow_file_a }).asString());
}
@@ -1064,7 +1059,7 @@
TEST_CASE("Decode JSON", "[expressionDecodeJSON]") { // NOLINT
auto expr = expression::compile("${message:unescapeJson()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "This is a \\\"test!\\\"");
REQUIRE("This is a \"test!\"" == expr({ flow_file_a }).asString());
}
@@ -1072,7 +1067,7 @@
TEST_CASE("Encode Decode JSON", "[expressionEncodeDecodeJSON]") { // NOLINT
auto expr = expression::compile("${message:escapeJson():unescapeJson()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "This is a \"test!\"");
REQUIRE("This is a \"test!\"" == expr({ flow_file_a }).asString());
}
@@ -1080,7 +1075,7 @@
TEST_CASE("Encode XML", "[expressionEncodeXML]") { // NOLINT
auto expr = expression::compile("${message:escapeXml()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "Zero > One < \"two!\" & 'true'");
REQUIRE("Zero > One < "two!" & 'true'" == expr({ flow_file_a }).asString());
}
@@ -1088,7 +1083,7 @@
TEST_CASE("Decode XML", "[expressionDecodeXML]") { // NOLINT
auto expr = expression::compile("${message:unescapeXml()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "Zero > One < "two!" & 'true'");
REQUIRE("Zero > One < \"two!\" & 'true'" == expr({ flow_file_a }).asString());
}
@@ -1096,7 +1091,7 @@
TEST_CASE("Encode Decode XML", "[expressionEncodeDecodeXML]") { // NOLINT
auto expr = expression::compile("${message:escapeXml():unescapeXml()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "Zero > One < \"two!\" & 'true'");
REQUIRE("Zero > One < \"two!\" & 'true'" == expr({ flow_file_a }).asString());
}
@@ -1104,7 +1099,7 @@
TEST_CASE("Encode HTML3", "[expressionEncodeHTML3]") { // NOLINT
auto expr = expression::compile("${message:escapeHtml3()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "¥ & < «");
REQUIRE("¥ & < «" == expr({ flow_file_a }).asString());
}
@@ -1112,7 +1107,7 @@
TEST_CASE("Decode HTML3", "[expressionDecodeHTML3]") { // NOLINT
auto expr = expression::compile("${message:unescapeHtml3()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "¥ & < «");
REQUIRE("¥ & < «" == expr({ flow_file_a }).asString());
}
@@ -1120,7 +1115,7 @@
TEST_CASE("Encode Decode HTML3", "[expressionEncodeDecodeHTML3]") { // NOLINT
auto expr = expression::compile("${message:escapeHtml3():unescapeHtml3()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "¥ & < «");
REQUIRE("¥ & < «" == expr({ flow_file_a }).asString());
}
@@ -1128,7 +1123,7 @@
TEST_CASE("Encode HTML4", "[expressionEncodeHTML4]") { // NOLINT
auto expr = expression::compile("${message:escapeHtml4()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "¥ & Φ < «");
REQUIRE("¥ & Φ < «" == expr({ flow_file_a }).asString());
}
@@ -1136,7 +1131,7 @@
TEST_CASE("Decode HTML4", "[expressionDecodeHTML4]") { // NOLINT
auto expr = expression::compile("${message:unescapeHtml4()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "¥ ι & < «");
REQUIRE("¥ ι & < «" == expr({ flow_file_a }).asString());
}
@@ -1144,7 +1139,7 @@
TEST_CASE("Encode Decode HTML4", "[expressionEncodeDecodeHTML4]") { // NOLINT
auto expr = expression::compile("${message:escapeHtml4():unescapeHtml4()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "¥ & < Π «");
REQUIRE("¥ & < Π «" == expr({ flow_file_a }).asString());
}
@@ -1152,7 +1147,7 @@
TEST_CASE("Encode CSV", "[expressionEncodeCSV]") { // NOLINT
auto expr = expression::compile("${message:escapeCsv()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "Zero > One < \"two!\" & 'true'");
REQUIRE("\"Zero > One < \"\"two!\"\" & 'true'\"" == expr({ flow_file_a }).asString());
}
@@ -1160,7 +1155,7 @@
TEST_CASE("Decode CSV", "[expressionDecodeCSV]") { // NOLINT
auto expr = expression::compile("${message:unescapeCsv()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", R"("Zero > One < ""two!"" & 'true'")");
REQUIRE("Zero > One < \"two!\" & 'true'" == expr({ flow_file_a }).asString());
}
@@ -1168,7 +1163,7 @@
TEST_CASE("Decode CSV 2", "[expressionDecodeCSV2]") { // NOLINT
auto expr = expression::compile("${message:unescapeCsv()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", R"("quoted")");
REQUIRE("\"quoted\"" == expr({ flow_file_a }).asString());
}
@@ -1176,7 +1171,7 @@
TEST_CASE("Encode Decode CSV", "[expressionEncodeDecodeCSV]") { // NOLINT
auto expr = expression::compile("${message:escapeCsv():unescapeCsv()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "Zero > One < \"two!\" & 'true'");
REQUIRE("Zero > One < \"two!\" & 'true'" == expr({ flow_file_a }).asString());
}
@@ -1186,7 +1181,7 @@
TEST_CASE("Encode URL", "[expressionEncodeURL]") { // NOLINT
auto expr = expression::compile("${message:urlEncode()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "some value with spaces");
REQUIRE("some%20value%20with%20spaces" == expr({ flow_file_a }).asString());
}
@@ -1194,7 +1189,7 @@
TEST_CASE("Decode URL", "[expressionDecodeURL]") { // NOLINT
auto expr = expression::compile("${message:urlDecode()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "some%20value%20with%20spaces");
REQUIRE("some value with spaces" == expr({ flow_file_a }).asString());
}
@@ -1202,7 +1197,7 @@
TEST_CASE("Encode Decode URL", "[expressionEncodeDecodeURL]") { // NOLINT
auto expr = expression::compile("${message:urlEncode():urlDecode()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "some value with spaces");
REQUIRE("some value with spaces" == expr({ flow_file_a }).asString());
}
@@ -1210,7 +1205,7 @@
TEST_CASE("Encode URL", "[expressionEncodeURLExcept]") { // NOLINT
auto expr = expression::compile("${message:urlEncode()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "some value with spaces");
REQUIRE_THROWS(expr({flow_file_a}).asString());
}
@@ -1218,7 +1213,7 @@
TEST_CASE("Decode URL", "[expressionDecodeURLExcept]") { // NOLINT
auto expr = expression::compile("${message:urlDecode()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "some%20value%20with%20spaces");
REQUIRE_THROWS(expr({flow_file_a}).asString());
}
@@ -1226,7 +1221,7 @@
TEST_CASE("Encode Decode URL", "[expressionEncodeDecodeURLExcept]") { // NOLINT
auto expr = expression::compile("${message:urlEncode():urlDecode()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "some value with spaces");
REQUIRE_THROWS(expr({flow_file_a}).asString());
}
@@ -1238,7 +1233,7 @@
TEST_CASE("Parse Date", "[expressionParseDate]") { // NOLINT
auto expr = expression::compile("${message:toDate('%Y/%m/%d', 'America/Los_Angeles')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "2014/04/30");
REQUIRE("1398841200000" == expr({ flow_file_a }).asString());
}
@@ -1246,7 +1241,7 @@
TEST_CASE("Format Date", "[expressionFormatDate]") { // NOLINT
auto expr = expression::compile("${message:format('%m-%d-%Y', 'GMT')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "1394755200000");
REQUIRE("03-14-2014" == expr({ flow_file_a }).asString());
}
@@ -1254,7 +1249,7 @@
TEST_CASE("Reformat Date", "[expressionReformatDate]") { // NOLINT
auto expr = expression::compile("${message:toDate('%Y/%m/%d', 'GMT'):format('%m-%d-%Y', 'America/New_York')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "2014/03/14");
REQUIRE("03-13-2014" == expr({ flow_file_a }).asString());
}
@@ -1262,7 +1257,7 @@
TEST_CASE("Now Date", "[expressionNowDate]") { // NOLINT
auto expr = expression::compile("${now():format('%Y')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "2014/03/14");
time_t t = time(nullptr);
struct tm lt;
@@ -1276,28 +1271,28 @@
TEST_CASE("IP", "[expressionIP]") { // NOLINT
auto expr = expression::compile("${ip()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
- REQUIRE("" != expr({ flow_file_a }).asString());
+ auto flow_file_a = std::make_shared<core::FlowFile>();
+ REQUIRE("" != expr( { flow_file_a }).asString());
}
TEST_CASE("Full Hostname", "[expressionFullHostname]") { // NOLINT
auto expr = expression::compile("${hostname('true')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
- REQUIRE("" != expr({ flow_file_a }).asString());
+ auto flow_file_a = std::make_shared<core::FlowFile>();
+ REQUIRE("" != expr( { flow_file_a }).asString());
}
TEST_CASE("UUID", "[expressionUuid]") { // NOLINT
auto expr = expression::compile("${UUID()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
- REQUIRE(36 == expr({ flow_file_a }).asString().length());
+ auto flow_file_a = std::make_shared<core::FlowFile>();
+ REQUIRE(36 == expr( { flow_file_a }).asString().length());
}
TEST_CASE("Trim", "[expressionTrim]") { // NOLINT
auto expr = expression::compile("${message:trim()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", " 1 2 3 ");
REQUIRE("1 2 3" == expr({ flow_file_a }).asString());
}
@@ -1305,7 +1300,7 @@
TEST_CASE("Append", "[expressionAppend]") { // NOLINT
auto expr = expression::compile("${message:append('.gz')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "a brand new filename.txt");
REQUIRE("a brand new filename.txt.gz" == expr({ flow_file_a }).asString());
}
@@ -1313,7 +1308,7 @@
TEST_CASE("Prepend", "[expressionPrepend]") { // NOLINT
auto expr = expression::compile("${message:prepend('a brand new ')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "filename.txt");
REQUIRE("a brand new filename.txt" == expr({ flow_file_a }).asString());
}
@@ -1321,7 +1316,7 @@
TEST_CASE("Length", "[expressionLength]") { // NOLINT
auto expr = expression::compile("${message:length()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "a brand new filename.txt");
REQUIRE(24 == expr({ flow_file_a }).asUnsignedLong());
}
@@ -1329,7 +1324,7 @@
TEST_CASE("Encode B64", "[expressionEncodeB64]") { // NOLINT
auto expr = expression::compile("${message:base64Encode()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "admin:admin");
REQUIRE("YWRtaW46YWRtaW4=" == expr({ flow_file_a }).asString());
}
@@ -1337,7 +1332,7 @@
TEST_CASE("Decode B64", "[expressionDecodeB64]") { // NOLINT
auto expr = expression::compile("${message:base64Decode()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "YWRtaW46YWRtaW4=");
REQUIRE("admin:admin" == expr({ flow_file_a }).asString());
}
@@ -1345,7 +1340,7 @@
TEST_CASE("Encode Decode B64", "[expressionEncodeDecodeB64]") { // NOLINT
auto expr = expression::compile("${message:base64Encode():base64Decode()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("message", "Zero > One < \"two!\" & 'true'");
REQUIRE("Zero > One < \"two!\" & 'true'" == expr({ flow_file_a }).asString());
}
@@ -1353,7 +1348,7 @@
TEST_CASE("All Contains", "[expressionAllContains]") { // NOLINT
auto expr = expression::compile("${allAttributes('a', 'b'):contains('hello')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("a", "hello 1");
flow_file_a->addAttribute("b", "hello 2");
REQUIRE(expr({ flow_file_a }).asBoolean());
@@ -1362,7 +1357,7 @@
TEST_CASE("All Contains 2", "[expressionAllContains2]") { // NOLINT
auto expr = expression::compile("${allAttributes('a', 'b'):contains('hello')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("a", "hello 1");
flow_file_a->addAttribute("b", "mello 2");
REQUIRE(!expr({ flow_file_a }).asBoolean());
@@ -1371,7 +1366,7 @@
TEST_CASE("Any Contains", "[expressionAnyContains]") { // NOLINT
auto expr = expression::compile("${anyAttribute('a', 'b'):contains('hello')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("a", "hello 1");
flow_file_a->addAttribute("b", "mello 2");
REQUIRE(expr({ flow_file_a }).asBoolean());
@@ -1380,7 +1375,7 @@
TEST_CASE("Any Contains 2", "[expressionAnyContains2]") { // NOLINT
auto expr = expression::compile("${anyAttribute('a', 'b'):contains('hello')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("a", "mello 1");
flow_file_a->addAttribute("b", "mello 2");
REQUIRE(!expr({ flow_file_a }).asBoolean());
@@ -1391,7 +1386,7 @@
TEST_CASE("All Matching Contains", "[expressionAllMatchingContains]") { // NOLINT
auto expr = expression::compile("${allMatchingAttributes('xyz_.*'):contains('hello')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("xyz_1", "hello 1");
flow_file_a->addAttribute("xyz_2", "hello 2");
REQUIRE(expr({ flow_file_a }).asBoolean());
@@ -1400,7 +1395,7 @@
TEST_CASE("All Matching Contains 2", "[expressionAllMatchingContains2]") { // NOLINT
auto expr = expression::compile("${allMatchingAttributes('abc_.*'):contains('hello')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("xyz_1", "hello 1");
flow_file_a->addAttribute("xyz_2", "hello 2");
REQUIRE(!expr({ flow_file_a }).asBoolean());
@@ -1409,7 +1404,7 @@
TEST_CASE("All Matching Contains 3", "[expressionAllMatchingContains3]") { // NOLINT
auto expr = expression::compile("${allMatchingAttributes('abc_.*'):contains('hello')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("xyz_1", "hello 1");
flow_file_a->addAttribute("abc_2", "hello 2");
REQUIRE(expr({ flow_file_a }).asBoolean());
@@ -1418,7 +1413,7 @@
TEST_CASE("All Matching Contains 4", "[expressionAllMatchingContains4]") { // NOLINT
auto expr = expression::compile("${allMatchingAttributes('xyz_.*'):contains('hello')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("xyz_1", "hello 1");
flow_file_a->addAttribute("xyz_2", "2");
REQUIRE(!expr({ flow_file_a }).asBoolean());
@@ -1427,7 +1422,7 @@
TEST_CASE("Any Matching Contains", "[expressionAnyMatchingContains]") { // NOLINT
auto expr = expression::compile("${anyMatchingAttribute('xyz_.*'):contains('hello')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("xyz_1", "hello 1");
flow_file_a->addAttribute("xyz_2", "mello 2");
REQUIRE(expr({ flow_file_a }).asBoolean());
@@ -1436,7 +1431,7 @@
TEST_CASE("Any Matching Contains 2", "[expressionAnyMatchingContains2]") { // NOLINT
auto expr = expression::compile("${anyMatchingAttribute('abc_.*'):contains('hello')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("xyz_1", "hello 1");
flow_file_a->addAttribute("xyz_2", "mello 2");
REQUIRE(!expr({ flow_file_a }).asBoolean());
@@ -1445,7 +1440,7 @@
TEST_CASE("Any Matching Contains 3", "[expressionAnyMatchingContains3]") { // NOLINT
auto expr = expression::compile("${anyMatchingAttribute('abc_.*'):contains('hello')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("abc_1", "hello 1");
flow_file_a->addAttribute("xyz_2", "mello 2");
REQUIRE(expr({ flow_file_a }).asBoolean());
@@ -1454,7 +1449,7 @@
TEST_CASE("Any Matching Contains 4", "[expressionAnyMatchingContains4]") { // NOLINT
auto expr = expression::compile("${anyMatchingAttribute('abc_.*'):contains('hello')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("xyz_1", "mello 1");
flow_file_a->addAttribute("xyz_2", "mello 2");
REQUIRE(!expr({ flow_file_a }).asBoolean());
@@ -1465,7 +1460,7 @@
TEST_CASE("All Delineated Contains", "[expressionAllDelineatedContains]") { // NOLINT
auto expr = expression::compile("${allDelineatedValues(${word_list}, \",\"):contains('hello')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("word_list", "hello_1,hello_2");
REQUIRE(expr({ flow_file_a }).asBoolean());
}
@@ -1473,7 +1468,7 @@
TEST_CASE("All Delineated Contains 2", "[expressionAllDelineatedContains2]") { // NOLINT
auto expr = expression::compile("${allDelineatedValues(${word_list}, \",\"):contains('hello')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("word_list", "hello_1,mello_2");
REQUIRE(!expr({ flow_file_a }).asBoolean());
}
@@ -1481,7 +1476,7 @@
TEST_CASE("All Delineated Contains 3", "[expressionAllDelineatedContains3]") { // NOLINT
auto expr = expression::compile("${allDelineatedValues(${word_list}, \" \"):contains('1,h')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("word_list", "hello_1,hello_2");
REQUIRE(expr({ flow_file_a }).asBoolean());
}
@@ -1489,7 +1484,7 @@
TEST_CASE("Any Delineated Contains", "[expressionAnyDelineatedContains]") { // NOLINT
auto expr = expression::compile("${anyDelineatedValue(${word_list}, \",\"):contains('hello')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("word_list", "hello_1,mello_2");
REQUIRE(expr({ flow_file_a }).asBoolean());
}
@@ -1497,7 +1492,7 @@
TEST_CASE("Any Delineated Contains 2", "[expressionAnyDelineatedContains2]") { // NOLINT
auto expr = expression::compile("${anyDelineatedValue(${word_list}, \",\"):contains('hello')}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("word_list", "mello_1,mello_2");
REQUIRE(!expr({ flow_file_a }).asBoolean());
}
@@ -1505,7 +1500,7 @@
TEST_CASE("Count", "[expressionCount]") { // NOLINT
auto expr = expression::compile("${allAttributes('a', 'b'):contains('hello'):count()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("a", "hello 1");
flow_file_a->addAttribute("b", "mello 2");
REQUIRE(1 == expr({ flow_file_a }).asUnsignedLong());
@@ -1514,7 +1509,7 @@
TEST_CASE("Count 2", "[expressionCount2]") { // NOLINT
auto expr = expression::compile("${allAttributes('a', 'b'):contains('mello'):count()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("a", "mello 1");
flow_file_a->addAttribute("b", "mello 2");
flow_file_a->addAttribute("c", "hello 3");
@@ -1524,7 +1519,7 @@
TEST_CASE("Count 3", "[expressionCount3]") { // NOLINT
auto expr = expression::compile("abc${allAttributes('a', 'b'):contains('mello'):count()}xyz");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("a", "mello 1");
flow_file_a->addAttribute("b", "mello 2");
flow_file_a->addAttribute("c", "hello 3");
@@ -1534,7 +1529,7 @@
TEST_CASE("Join", "[expressionJoin]") { // NOLINT
auto expr = expression::compile("abc_${allAttributes('a', 'b'):prepend('def_'):append('_ghi'):join(\"|\")}_xyz");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("a", "hello");
flow_file_a->addAttribute("b", "mello");
REQUIRE("abc_def_hello_ghi|def_mello_ghi_xyz" == expr({ flow_file_a }).asString());
@@ -1543,7 +1538,7 @@
TEST_CASE("Join 2", "[expressionJoin2]") { // NOLINT
auto expr = expression::compile("abc_${allAttributes('a', 'b'):join(\"|\"):prepend('def_'):append('_ghi')}_xyz");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
flow_file_a->addAttribute("a", "hello");
flow_file_a->addAttribute("b", "mello");
REQUIRE("abc_def_hello|mello_ghi_xyz" == expr({ flow_file_a }).asString());
@@ -1552,7 +1547,7 @@
TEST_CASE("resolve_user_id_test", "[resolve_user_id tests]") { // NOLINT
auto expr = expression::compile("${attribute_sid:resolve_user_id()}");
- auto flow_file_a = std::make_shared<MockFlowFile>();
+ auto flow_file_a = std::make_shared<core::FlowFile>();
SECTION("TEST 0"){
flow_file_a->addAttribute("attribute_sid", "0");
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 62f63aa..c3042f6 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -54,6 +54,7 @@
}
return is_valid_;
}
+
void DatabaseContentRepository::stop() {
if (db_) {
auto opendb = db_->open();
@@ -64,54 +65,54 @@
db_.reset();
}
-std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim, bool append) {
+std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const minifi::ResourceClaim &claim, bool append) {
// the traditional approach with these has been to return -1 from the stream; however, since we have the ability here
// we can simply return a nullptr, which is also valid from the API when this stream is not valid.
- if (nullptr == claim || !is_valid_ || !db_)
+ if (!is_valid_ || !db_)
return nullptr;
// append is already supported in all modes
- return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), true);
+ return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), true);
}
-std::shared_ptr<io::BaseStream> DatabaseContentRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+std::shared_ptr<io::BaseStream> DatabaseContentRepository::read(const minifi::ResourceClaim &claim) {
// the traditional approach with these has been to return -1 from the stream; however, since we have the ability here
// we can simply return a nullptr, which is also valid from the API when this stream is not valid.
- if (nullptr == claim || !is_valid_ || !db_)
+ if (!is_valid_ || !db_)
return nullptr;
- return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), false);
+ return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), false);
}
-bool DatabaseContentRepository::exists(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
+bool DatabaseContentRepository::exists(const minifi::ResourceClaim &streamId) {
auto opendb = db_->open();
if (!opendb) {
return false;
}
std::string value;
rocksdb::Status status;
- status = opendb->Get(rocksdb::ReadOptions(), streamId->getContentFullPath(), &value);
+ status = opendb->Get(rocksdb::ReadOptions(), streamId.getContentFullPath(), &value);
if (status.ok()) {
- logger_->log_debug("%s exists", streamId->getContentFullPath());
+ logger_->log_debug("%s exists", streamId.getContentFullPath());
return true;
} else {
- logger_->log_debug("%s does not exist", streamId->getContentFullPath());
+ logger_->log_debug("%s does not exist", streamId.getContentFullPath());
return false;
}
}
-bool DatabaseContentRepository::remove(const std::shared_ptr<minifi::ResourceClaim> &claim) {
- if (nullptr == claim || !is_valid_ || !db_)
+bool DatabaseContentRepository::remove(const minifi::ResourceClaim &claim) {
+ if (!is_valid_ || !db_)
return false;
auto opendb = db_->open();
if (!opendb) {
return false;
}
rocksdb::Status status;
- status = opendb->Delete(rocksdb::WriteOptions(), claim->getContentFullPath());
+ status = opendb->Delete(rocksdb::WriteOptions(), claim.getContentFullPath());
if (status.ok()) {
- logger_->log_debug("Deleted %s", claim->getContentFullPath());
+ logger_->log_debug("Deleted %s", claim.getContentFullPath());
return true;
} else {
- logger_->log_debug("Attempted, but could not delete %s", claim->getContentFullPath());
+ logger_->log_debug("Attempted, but could not delete %s", claim.getContentFullPath());
return false;
}
}
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h
index 85e2f7d..6abc476 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -86,17 +86,17 @@
virtual void stop();
- virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<minifi::ResourceClaim> &claim, bool append = false);
+ virtual std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append = false);
- virtual std::shared_ptr<io::BaseStream> read(const std::shared_ptr<minifi::ResourceClaim> &claim);
+ virtual std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim &claim);
- virtual bool close(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+ virtual bool close(const minifi::ResourceClaim &claim) {
return remove(claim);
}
- virtual bool remove(const std::shared_ptr<minifi::ResourceClaim> &claim);
+ virtual bool remove(const minifi::ResourceClaim &claim);
- virtual bool exists(const std::shared_ptr<minifi::ResourceClaim> &streamId);
+ virtual bool exists(const minifi::ResourceClaim &streamId);
virtual void yield() {
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp
index 57d3ab6..b45647c 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -87,9 +87,7 @@
if (content_repo_) {
for (const auto &ffr : purgeList) {
auto claim = ffr->getResourceClaim();
- if (claim) {
- content_repo_->removeIfOrphaned(claim);
- }
+ if (claim) claim->decreaseFlowFileRecordOwnedCount();
}
}
}
@@ -165,14 +163,15 @@
found = (search != connectionMap.end());
}
if (found) {
- // we find the connection for the persistent flowfile, create the flowfile and enqueue that
- std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead);
eventRead->setStoredToRepository(true);
+ // we found the connection for the persistent flowFile
+ auto claim = eventRead->getResourceClaim();
+ // on behalf of the just resurrected persisted instance
+ if (claim) claim->increaseFlowFileRecordOwnedCount();
+ // even if a processor immediately marks it for deletion, flush only happens after prune_stored_flowfiles
search->second->put(eventRead);
} else {
logger_->log_warn("Could not find connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath());
- auto claim = eventRead->getResourceClaim();
- if (claim) claim->decreaseFlowFileRecordOwnedCount();
keys_to_delete.enqueue(key);
}
} else {
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h
index b3cfc2b..e69fd19 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -182,6 +182,7 @@
if (running_) {
return;
}
+ content_repo_->reset();
running_ = true;
thread_ = std::thread(&FlowFileRepository::run, shared_from_this());
logger_->log_debug("%s Repository Monitor Thread Start", getName());
diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h
index 7557056..7c80aef 100644
--- a/libminifi/include/FlowFileRecord.h
+++ b/libminifi/include/FlowFileRecord.h
@@ -163,11 +163,6 @@
return claim_ ? claim_->getContentFullPath() : "";
}
- /**
- * Cleanly relinquish a resource claim
- */
- virtual void releaseClaim(std::shared_ptr<ResourceClaim> claim);
-
FlowFileRecord &operator=(const FlowFileRecord &);
FlowFileRecord(const FlowFileRecord &parent) = delete;
diff --git a/libminifi/include/ResourceClaim.h b/libminifi/include/ResourceClaim.h
index d773c3d..9a4070d 100644
--- a/libminifi/include/ResourceClaim.h
+++ b/libminifi/include/ResourceClaim.h
@@ -45,8 +45,10 @@
extern void setDefaultDirectory(std::string);
// ResourceClaim Class
-class ResourceClaim : public std::enable_shared_from_this<ResourceClaim> {
+class ResourceClaim {
public:
+ // the type which uniquely represents the resource for the owning manager
+ using Path = std::string;
// Constructor
/*!
* Create a new resource claim
@@ -55,42 +57,32 @@
explicit ResourceClaim(std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager);
- explicit ResourceClaim(const std::string path, std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager, bool deleted = false);
+ explicit ResourceClaim(const Path& path, std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager);
// Destructor
- ~ResourceClaim() = default;
+ ~ResourceClaim();
// increaseFlowFileRecordOwnedCount
void increaseFlowFileRecordOwnedCount() {
- claim_manager_->incrementStreamCount(shared_from_this());
+ claim_manager_->incrementStreamCount(*this);
}
// decreaseFlowFileRecordOwenedCount
void decreaseFlowFileRecordOwnedCount() {
- claim_manager_->decrementStreamCount(shared_from_this());
+ claim_manager_->decrementStreamCount(*this);
}
// getFlowFileRecordOwenedCount
uint64_t getFlowFileRecordOwnedCount() {
- return claim_manager_->getStreamCount(shared_from_this());
+ return claim_manager_->getStreamCount(*this);
}
// Get the content full path
- std::string getContentFullPath() {
+ Path getContentFullPath() const {
return _contentFullPath;
}
- // Set the content full path
- void setContentFullPath(std::string path) {
- _contentFullPath = path;
- }
-
- void deleteClaim() {
- if (!deleted_) {
- deleted_ = true;
- }
- }
bool exists() {
if (claim_manager_ == nullptr) {
return false;
}
- return claim_manager_->exists(shared_from_this());
+ return claim_manager_->exists(*this);
}
friend std::ostream& operator<<(std::ostream& stream, const ResourceClaim& claim) {
@@ -104,9 +96,8 @@
}
protected:
- std::atomic<bool> deleted_;
// Full path to the content
- std::string _contentFullPath;
+ const Path _contentFullPath;
std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager_;
diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h
index 2fca0a3..2b9bbae 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -56,30 +56,14 @@
*/
virtual void stop() = 0;
- /**
- * Removes an item if it was orphan
- */
- virtual bool removeIfOrphaned(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
+ void reset() {
std::lock_guard<std::mutex> lock(count_map_mutex_);
- const std::string str = streamId->getContentFullPath();
- auto count = count_map_.find(str);
- if (count != count_map_.end()) {
- if (count_map_[str] == 0) {
- remove(streamId);
- count_map_.erase(str);
- return true;
- } else {
- return false;
- }
- } else {
- remove(streamId);
- return true;
- }
+ count_map_.clear();
}
- virtual uint32_t getStreamCount(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
+ virtual uint32_t getStreamCount(const minifi::ResourceClaim &streamId) {
std::lock_guard<std::mutex> lock(count_map_mutex_);
- auto cnt = count_map_.find(streamId->getContentFullPath());
+ auto cnt = count_map_.find(streamId.getContentFullPath());
if (cnt != count_map_.end()) {
return cnt->second;
} else {
@@ -87,9 +71,9 @@
}
}
- virtual void incrementStreamCount(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
+ virtual void incrementStreamCount(const minifi::ResourceClaim &streamId) {
std::lock_guard<std::mutex> lock(count_map_mutex_);
- const std::string str = streamId->getContentFullPath();
+ const std::string str = streamId.getContentFullPath();
auto count = count_map_.find(str);
if (count != count_map_.end()) {
count_map_[str] = count->second + 1;
@@ -98,14 +82,17 @@
}
}
- virtual void decrementStreamCount(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
+ virtual StreamState decrementStreamCount(const minifi::ResourceClaim &streamId) {
std::lock_guard<std::mutex> lock(count_map_mutex_);
- const std::string str = streamId->getContentFullPath();
+ const std::string str = streamId.getContentFullPath();
auto count = count_map_.find(str);
- if (count != count_map_.end() && count->second > 0) {
+ if (count != count_map_.end() && count->second > 1) {
count_map_[str] = count->second - 1;
+ return StreamState::Alive;
} else {
count_map_.erase(str);
+ remove(streamId);
+ return StreamState::Deleted;
}
}
diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h
index a546953..34daa65 100644
--- a/libminifi/include/core/FlowFile.h
+++ b/libminifi/include/core/FlowFile.h
@@ -36,64 +36,6 @@
namespace core {
class FlowFile : public core::Connectable, public ReferenceContainer {
- private:
- class FlowFileOwnedResourceClaimPtr{
- public:
- FlowFileOwnedResourceClaimPtr() = default;
- explicit FlowFileOwnedResourceClaimPtr(const std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
- if (claim_) claim_->increaseFlowFileRecordOwnedCount();
- }
- explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&& claim) : claim_(std::move(claim)) {
- if (claim_) claim_->increaseFlowFileRecordOwnedCount();
- }
- FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : claim_(ref.claim_) {
- if (claim_) claim_->increaseFlowFileRecordOwnedCount();
- }
- FlowFileOwnedResourceClaimPtr(FlowFileOwnedResourceClaimPtr&& ref) : claim_(std::move(ref.claim_)) {
- // taking ownership of claim, no need to increment/decrement
- }
- FlowFileOwnedResourceClaimPtr& operator=(const FlowFileOwnedResourceClaimPtr& ref) = delete;
- FlowFileOwnedResourceClaimPtr& operator=(FlowFileOwnedResourceClaimPtr&& ref) = delete;
-
- FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const FlowFileOwnedResourceClaimPtr& ref) {
- return set(owner, ref.claim_);
- }
- FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const std::shared_ptr<ResourceClaim>& newClaim) {
- auto oldClaim = claim_;
- claim_ = newClaim;
- // the order of increase/release is important
- // with refcount manipulation we should always increment first, then decrement as this way we don't accidentally
- // discard the object under ourselves, note that an equality check will not suffice as two ResourceClaim
- // instances can reference the same file (they could have the same contentPath)
- if (claim_) claim_->increaseFlowFileRecordOwnedCount();
- if (oldClaim) owner.releaseClaim(oldClaim);
- return *this;
- }
- const std::shared_ptr<ResourceClaim>& get() const {
- return claim_;
- }
- const std::shared_ptr<ResourceClaim>& operator->() const {
- return claim_;
- }
- operator bool() const noexcept {
- return static_cast<bool>(claim_);
- }
- ~FlowFileOwnedResourceClaimPtr() {
- // allow the owner FlowFile to manually release the claim
- // while logging stuff and removing it from repositories
- assert(!claim_);
- }
-
- private:
- /*
- * We are aiming for the constraint that all FlowFiles should have a non-null claim pointer,
- * unfortunately, for now, some places (e.g. ProcessSession::create) violate this constraint.
- * We should indicate an empty or invalid content with special claims like
- * InvalidResourceClaim and EmptyResourceClaim.
- */
- std::shared_ptr<ResourceClaim> claim_;
- };
-
public:
FlowFile();
~FlowFile() override;
@@ -136,12 +78,6 @@
bool hasStashClaim(const std::string& key);
/**
- * Decrease the flow file record owned count for the resource claim and, if
- * its counter is at zero, remove it from the repo.
- */
- virtual void releaseClaim(const std::shared_ptr<ResourceClaim> claim) = 0;
-
- /**
* Get lineage identifiers
*/
std::set<std::string>& getlineageIdentifiers();
@@ -373,9 +309,9 @@
// Attributes key/values pairs for the flow record
std::map<std::string, std::string> attributes_;
// Pointer to the associated content resource claim
- FlowFileOwnedResourceClaimPtr claim_;
+ std::shared_ptr<ResourceClaim> claim_;
// Pointers to stashed content resource claims
- std::map<std::string, FlowFileOwnedResourceClaimPtr> stashedContent_;
+ std::map<std::string, std::shared_ptr<ResourceClaim>> stashedContent_;
// UUID string
// std::string uuid_str_;
// UUID string for all parents
diff --git a/libminifi/include/core/StreamManager.h b/libminifi/include/core/StreamManager.h
index 57e839f..46a9e20 100644
--- a/libminifi/include/core/StreamManager.h
+++ b/libminifi/include/core/StreamManager.h
@@ -40,6 +40,10 @@
template<typename T>
class StreamManager {
public:
+ enum class StreamState{
+ Deleted,
+ Alive
+ };
virtual ~StreamManager() = default;
virtual std::string getStoragePath() const = 0;
@@ -49,21 +53,21 @@
* @param streamId stream identifier
* @return stream pointer.
*/
- virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<T> &streamId, bool append = false) = 0;
+ virtual std::shared_ptr<io::BaseStream> write(const T &streamId, bool append = false) = 0;
/**
* Create a read stream using the streamId as a reference.
* @param streamId stream identifier
* @return stream pointer.
*/
- virtual std::shared_ptr<io::BaseStream> read(const std::shared_ptr<T> &streamId) = 0;
+ virtual std::shared_ptr<io::BaseStream> read(const T &streamId) = 0;
/**
* Closes the stream
* @param streamId stream identifier
* @return result of operation.
*/
- virtual bool close(const std::shared_ptr<T> &streamId) = 0;
+ virtual bool close(const T &streamId) = 0;
/**
* Removes the stream from this stream manager. The end result
@@ -71,20 +75,15 @@
* @param streamId stream identifier
* @return result of operation.
*/
- virtual bool remove(const std::shared_ptr<T> &streamId) = 0;
+ virtual bool remove(const T &streamId) = 0;
- /**
- * Removes an item if it was orphan
- */
- virtual bool removeIfOrphaned(const std::shared_ptr<T> &streamId) = 0;
+ virtual uint32_t getStreamCount(const T &streamId) = 0;
- virtual uint32_t getStreamCount(const std::shared_ptr<T> &streamId) = 0;
+ virtual void incrementStreamCount(const T &streamId) = 0;
- virtual void incrementStreamCount(const std::shared_ptr<T> &streamId) = 0;
+ virtual StreamState decrementStreamCount(const T &streamId) = 0;
- virtual void decrementStreamCount(const std::shared_ptr<T> &streamId) = 0;
-
- virtual bool exists(const std::shared_ptr<T> &streamId) = 0;
+ virtual bool exists(const T &streamId) = 0;
};
} // namespace core
diff --git a/libminifi/include/core/repository/FileSystemRepository.h b/libminifi/include/core/repository/FileSystemRepository.h
index 9439ec3..c2b8c75 100644
--- a/libminifi/include/core/repository/FileSystemRepository.h
+++ b/libminifi/include/core/repository/FileSystemRepository.h
@@ -47,17 +47,17 @@
virtual void stop();
- bool exists(const std::shared_ptr<minifi::ResourceClaim> &streamId);
+ bool exists(const minifi::ResourceClaim &streamId);
- virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<minifi::ResourceClaim> &claim, bool append = false);
+ virtual std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append = false);
- virtual std::shared_ptr<io::BaseStream> read(const std::shared_ptr<minifi::ResourceClaim> &claim);
+ virtual std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim &claim);
- virtual bool close(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+ virtual bool close(const minifi::ResourceClaim &claim) {
return remove(claim);
}
- virtual bool remove(const std::shared_ptr<minifi::ResourceClaim> &claim);
+ virtual bool remove(const minifi::ResourceClaim &claim);
private:
std::shared_ptr<logging::Logger> logger_;
diff --git a/libminifi/include/core/repository/VolatileContentRepository.h b/libminifi/include/core/repository/VolatileContentRepository.h
index 037f7b8..03ece73 100644
--- a/libminifi/include/core/repository/VolatileContentRepository.h
+++ b/libminifi/include/core/repository/VolatileContentRepository.h
@@ -41,13 +41,13 @@
* Purpose: Stages content into a volatile area of memory. Note that when the maximum number
* of entries is consumed we will rollback a session to wait for others to be freed.
*/
-class VolatileContentRepository : public core::ContentRepository, public virtual core::repository::VolatileRepository<std::shared_ptr<minifi::ResourceClaim>> {
+class VolatileContentRepository : public core::ContentRepository, public virtual core::repository::VolatileRepository<ResourceClaim::Path> {
public:
static const char *minimal_locking;
explicit VolatileContentRepository(std::string name = getClassName<VolatileContentRepository>())
: core::SerializableComponent(name),
- core::repository::VolatileRepository<std::shared_ptr<minifi::ResourceClaim>>(name),
+ core::repository::VolatileRepository<ResourceClaim::Path>(name),
minimize_locking_(true),
logger_(logging::LoggerFactory<VolatileContentRepository>::getLogger()) {
max_count_ = 15000;
@@ -79,22 +79,22 @@
* @param claim resource claim
* @return BaseStream shared pointer that represents the stream the consumer will write to.
*/
- virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<minifi::ResourceClaim> &claim, bool append);
+ virtual std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append);
/**
* Creates readable stream.
* @param claim resource claim
* @return BaseStream shared pointer that represents the stream from which the consumer will read..
*/
- virtual std::shared_ptr<io::BaseStream> read(const std::shared_ptr<minifi::ResourceClaim> &claim);
+ virtual std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim &claim);
- virtual bool exists(const std::shared_ptr<minifi::ResourceClaim> &streamId);
+ virtual bool exists(const minifi::ResourceClaim &streamId);
/**
* Closes the claim.
* @return whether or not the claim is associated with content stored in volatile memory.
*/
- virtual bool close(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+ virtual bool close(const minifi::ResourceClaim &claim) {
return remove(claim);
}
@@ -102,7 +102,7 @@
* Closes the claim.
* @return whether or not the claim is associated with content stored in volatile memory.
*/
- virtual bool remove(const std::shared_ptr<minifi::ResourceClaim> &claim);
+ virtual bool remove(const minifi::ResourceClaim &claim);
protected:
virtual void start();
@@ -117,16 +117,11 @@
private:
bool minimize_locking_;
- // function pointers that are associated with the claims.
- std::function<bool(std::shared_ptr<minifi::ResourceClaim>, std::shared_ptr<minifi::ResourceClaim>)> resource_claim_comparator_;
- std::function<bool(std::shared_ptr<minifi::ResourceClaim>)> resource_claim_check_;
- std::function<void(std::shared_ptr<minifi::ResourceClaim>)> claim_reclaimer_;
-
// mutex and master list that represent a cache of Atomic entries. this exists so that we don't have to walk the atomic entry list.
// The idea is to reduce the computational complexity while keeping access as maximally lock free as we can.
std::mutex map_mutex_;
- std::map<std::string, AtomicEntry<std::shared_ptr<minifi::ResourceClaim>>*> master_list_;
+ std::map<ResourceClaim::Path, AtomicEntry<ResourceClaim::Path>*> master_list_;
// logger
std::shared_ptr<logging::Logger> logger_;
diff --git a/libminifi/include/core/repository/VolatileFlowFileRepository.h b/libminifi/include/core/repository/VolatileFlowFileRepository.h
index c276d28..1c65f0e 100644
--- a/libminifi/include/core/repository/VolatileFlowFileRepository.h
+++ b/libminifi/include/core/repository/VolatileFlowFileRepository.h
@@ -50,20 +50,23 @@
repo_full_ = false;
while (running_) {
std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
- if (purge_required_ && nullptr != content_repo_) {
- std::lock_guard<std::mutex> lock(purge_mutex_);
- for (auto purgeItem : purge_list_) {
- std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
- if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(purgeItem.data()), purgeItem.size())) {
- std::shared_ptr<minifi::ResourceClaim> newClaim = eventRead->getResourceClaim();
- if (newClaim != nullptr) {
- content_repo_->removeIfOrphaned(newClaim);
- }
- }
+ flush();
+ }
+ flush();
+ }
+
+ virtual void flush() {
+ if (purge_required_ && nullptr != content_repo_) {
+ std::lock_guard<std::mutex> lock(purge_mutex_);
+ for (auto purgeItem : purge_list_) {
+ std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_);
+ if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(purgeItem.data()), purgeItem.size())) {
+ auto claim = eventRead->getResourceClaim();
+ if (claim) claim->decreaseFlowFileRecordOwnedCount();
}
- purge_list_.resize(0);
- purge_list_.clear();
}
+ purge_list_.resize(0);
+ purge_list_.clear();
}
}
diff --git a/libminifi/include/utils/Id.h b/libminifi/include/utils/Id.h
index b38d17b..116e29a 100644
--- a/libminifi/include/utils/Id.h
+++ b/libminifi/include/utils/Id.h
@@ -173,7 +173,6 @@
class NonRepeatingStringGenerator {
public:
- NonRepeatingStringGenerator();
std::string generate() {
return prefix_ + std::to_string(incrementor_++);
}
@@ -181,8 +180,8 @@
return incrementor_++;
}
private:
- std::atomic<uint64_t> incrementor_;
- std::string prefix_;
+ std::atomic<uint64_t> incrementor_{0};
+ std::string prefix_{std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count()) + "-"};
};
} // namespace utils
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 358b589..882686c 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -46,7 +46,7 @@
content_repo_(content_repo),
flow_repository_(flow_repository) {
id_ = local_flow_seq_number_.load();
- claim_.set(*this, claim);
+ claim_ = claim;
// Increase the local ID for the flow record
++local_flow_seq_number_;
// Populate the default attributes
@@ -77,7 +77,7 @@
offset_ = event->getOffset();
event->getUUID(uuid_);
uuid_connection_ = uuidConnection;
- claim_.set(*this, event->getResourceClaim());
+ claim_ = event->getResourceClaim();
if (event->getFlowIdentifier()) {
std::string attr;
event->getAttribute(FlowAttributeKey(FlowAttribute::FLOW_ID), attr);
@@ -94,7 +94,7 @@
snapshot_(""),
content_repo_(content_repo),
flow_repository_(flow_repository) {
- claim_.set(*this, event->getResourceClaim());
+ claim_ = event->getResourceClaim();
if (event->getFlowIdentifier()) {
std::string attr;
event->getAttribute(FlowAttributeKey(FlowAttribute::FLOW_ID), attr);
@@ -115,23 +115,6 @@
if (!claim_) {
logger_->log_debug("Claim is null ptr for %s", uuidStr_);
}
-
- claim_.set(*this, nullptr);
-
- // Disown stash claims
- for (auto &stashPair : stashedContent_) {
- auto& stashClaim = stashPair.second;
- stashClaim.set(*this, nullptr);
- }
-}
-
-void FlowFileRecord::releaseClaim(std::shared_ptr<ResourceClaim> claim) {
- // Decrease the flow file record owned count for the resource claim
- claim->decreaseFlowFileRecordOwnedCount();
- logger_->log_debug("Detaching Resource Claim %s, %s, attempt " "%" PRIu64, getUUIDStr(), claim->getContentFullPath(), claim->getFlowFileRecordOwnedCount());
- if (content_repo_ && content_repo_->removeIfOrphaned(claim)) {
- logger_->log_debug("Deleted Resource Claim %s", claim->getContentFullPath());
- }
}
bool FlowFileRecord::addKeyedAttribute(FlowAttribute key, const std::string &value) {
@@ -358,7 +341,7 @@
return false;
}
- claim_.set(*this, std::make_shared<ResourceClaim>(content_full_path, content_repo_, true));
+ claim_ = std::make_shared<ResourceClaim>(content_full_path, content_repo_);
return true;
}
diff --git a/libminifi/src/ResourceClaim.cpp b/libminifi/src/ResourceClaim.cpp
index 1e08e20..92e39b2 100644
--- a/libminifi/src/ResourceClaim.cpp
+++ b/libminifi/src/ResourceClaim.cpp
@@ -41,22 +41,28 @@
}
ResourceClaim::ResourceClaim(std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager)
- : claim_manager_(claim_manager),
- deleted_(false),
- logger_(logging::LoggerFactory<ResourceClaim>::getLogger()) {
- auto contentDirectory = claim_manager_->getStoragePath();
- if (contentDirectory.empty())
- contentDirectory = default_directory_path;
+ : _contentFullPath([&] {
+ auto contentDirectory = claim_manager->getStoragePath();
+ if (contentDirectory.empty())
+ contentDirectory = default_directory_path;
- // Create the full content path for the content
- _contentFullPath = contentDirectory + "/" + non_repeating_string_generator_.generate();
+ // Create the full content path for the content
+ return contentDirectory + "/" + non_repeating_string_generator_.generate();
+ }()),
+ claim_manager_(std::move(claim_manager)),
+ logger_(logging::LoggerFactory<ResourceClaim>::getLogger()) {
+ if (claim_manager_) increaseFlowFileRecordOwnedCount();
logger_->log_debug("Resource Claim created %s", _contentFullPath);
}
-ResourceClaim::ResourceClaim(const std::string path, std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager, bool deleted)
- : claim_manager_(claim_manager),
- deleted_(deleted) {
- _contentFullPath = path;
+ResourceClaim::ResourceClaim(const Path& path, std::shared_ptr<core::StreamManager<ResourceClaim>> claim_manager)
+ : _contentFullPath(path),
+ claim_manager_(std::move(claim_manager)) {
+ if (claim_manager_) increaseFlowFileRecordOwnedCount();
+}
+
+ResourceClaim::~ResourceClaim() {
+ if (claim_manager_) decreaseFlowFileRecordOwnedCount();
}
} /* namespace minifi */
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index f5765f8..095c601 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -64,7 +64,7 @@
size_ = other.size_;
penaltyExpiration_ms_ = other.penaltyExpiration_ms_;
attributes_ = other.attributes_;
- claim_.set(*this, other.claim_);
+ claim_ = other.claim_;
uuidStr_ = other.uuidStr_;
connection_ = other.connection_;
original_connection_ = other.original_connection_;
@@ -93,18 +93,18 @@
}
std::shared_ptr<ResourceClaim> FlowFile::getResourceClaim() {
- return claim_.get();
+ return claim_;
}
void FlowFile::clearResourceClaim() {
- claim_.set(*this, nullptr);
+ claim_ = nullptr;
}
void FlowFile::setResourceClaim(const std::shared_ptr<ResourceClaim>& claim) {
- claim_.set(*this, claim);
+ claim_ = claim;
}
std::shared_ptr<ResourceClaim> FlowFile::getStashClaim(const std::string& key) {
- return stashedContent_[key].get();
+ return stashedContent_[key];
}
void FlowFile::setStashClaim(const std::string& key, const std::shared_ptr<ResourceClaim>& claim) {
@@ -114,13 +114,13 @@
getUUIDStr().c_str(), key.c_str());
}
- stashedContent_[key].set(*this, claim);
+ stashedContent_[key] = claim;
}
void FlowFile::clearStashClaim(const std::string& key) {
auto claimIt = stashedContent_.find(key);
if (claimIt != stashedContent_.end()) {
- claimIt->second.set(*this, nullptr);
+ claimIt->second = nullptr;
stashedContent_.erase(claimIt);
}
}
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index c6de75a..eb977ca 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -245,7 +245,7 @@
try {
uint64_t startTime = utils::timeutils::getTimeMillis();
- std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
+ std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(*claim);
// Call the callback to write the content
if (nullptr == stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for write");
@@ -280,7 +280,7 @@
try {
uint64_t startTime = utils::timeutils::getTimeMillis();
- std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim, true);
+ std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(*claim, true);
if (nullptr == stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for append");
}
@@ -323,7 +323,7 @@
claim = flow->getResourceClaim();
- std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->read(claim);
+ std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->read(*claim);
if (nullptr == stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for read");
@@ -356,7 +356,7 @@
try {
auto startTime = utils::timeutils::getTimeMillis();
- std::shared_ptr<io::BaseStream> content_stream = process_context_->getContentRepository()->write(claim);
+ std::shared_ptr<io::BaseStream> content_stream = process_context_->getContentRepository()->write(*claim);
if (nullptr == content_stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Could not obtain claim for " + claim->getContentFullPath());
@@ -402,7 +402,7 @@
auto startTime = utils::timeutils::getTimeMillis();
std::ifstream input;
input.open(source.c_str(), std::fstream::in | std::fstream::binary);
- std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim);
+ std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(*claim);
if (nullptr == stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open new flowfile content for write");
}
@@ -519,7 +519,7 @@
claim = std::make_shared<ResourceClaim>(process_context_->getContentRepository());
}
if (stream == nullptr) {
- stream = process_context_->getContentRepository()->write(claim);
+ stream = process_context_->getContentRepository()->write(*claim);
}
if (stream == nullptr) {
logger_->log_error("Stream is null");
@@ -939,9 +939,7 @@
if (ff->isStored() && flowFileRepo->Delete(ff->getUUIDStr())) {
// original must be non-null since this flowFile is already stored in the repos ->
// must have come from a session->get()
- auto claim = original->getResourceClaim();
- // decrement on behalf of the persisted-instance-to-be-deleted
- if (claim) claim->decreaseFlowFileRecordOwnedCount();
+ assert(original);
ff->setStoredToRepository(false);
}
continue;
@@ -951,7 +949,7 @@
if (claim) claim->increaseFlowFileRecordOwnedCount();
auto originalClaim = original ? original->getResourceClaim() : nullptr;
// decrement on behalf of the overridden instance if any
- if (ff->isStored() && originalClaim) originalClaim->decreaseFlowFileRecordOwnedCount();
+ if (originalClaim) originalClaim->decreaseFlowFileRecordOwnedCount();
ff->setStoredToRepository(true);
}
diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp
index 4f9b83d..ea89cdc 100644
--- a/libminifi/src/core/repository/FileSystemRepository.cpp
+++ b/libminifi/src/core/repository/FileSystemRepository.cpp
@@ -42,21 +42,22 @@
void FileSystemRepository::stop() {
}
-std::shared_ptr<io::BaseStream> FileSystemRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim, bool append) {
- return std::make_shared<io::FileStream>(claim->getContentFullPath(), append);
+std::shared_ptr<io::BaseStream> FileSystemRepository::write(const minifi::ResourceClaim &claim, bool append) {
+ return std::make_shared<io::FileStream>(claim.getContentFullPath(), append);
}
-bool FileSystemRepository::exists(const std::shared_ptr<minifi::ResourceClaim> &streamId) {
- std::ifstream file(streamId->getContentFullPath());
+bool FileSystemRepository::exists(const minifi::ResourceClaim &streamId) {
+ std::ifstream file(streamId.getContentFullPath());
return file.good();
}
-std::shared_ptr<io::BaseStream> FileSystemRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) {
- return std::make_shared<io::FileStream>(claim->getContentFullPath(), 0, false);
+std::shared_ptr<io::BaseStream> FileSystemRepository::read(const minifi::ResourceClaim &claim) {
+ return std::make_shared<io::FileStream>(claim.getContentFullPath(), 0, false);
}
-bool FileSystemRepository::remove(const std::shared_ptr<minifi::ResourceClaim> &claim) {
- std::remove(claim->getContentFullPath().c_str());
+bool FileSystemRepository::remove(const minifi::ResourceClaim &claim) {
+ logger_->log_debug("Deleting resource %s", claim.getContentFullPath());
+ std::remove(claim.getContentFullPath().c_str());
return true;
}
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp
index d626354..d7039cf 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -38,17 +38,6 @@
bool VolatileContentRepository::initialize(const std::shared_ptr<Configure> &configure) {
VolatileRepository::initialize(configure);
- resource_claim_comparator_ = [](std::shared_ptr<minifi::ResourceClaim> lhsPtr, std::shared_ptr<minifi::ResourceClaim> rhsPtr) {
- if (lhsPtr == nullptr || rhsPtr == nullptr) {
- return false;
- }
- return lhsPtr->getContentFullPath() == rhsPtr->getContentFullPath();};
- resource_claim_check_ = [](std::shared_ptr<minifi::ResourceClaim> claim) {
- return claim->getFlowFileRecordOwnedCount() <= 0;};
- claim_reclaimer_ = [&](std::shared_ptr<minifi::ResourceClaim> claim) {if (claim->getFlowFileRecordOwnedCount() <= 0) {
- remove(claim);
- }
- };
if (configure != nullptr) {
bool minimize_locking = false;
@@ -89,52 +78,52 @@
logger_->log_info("%s Repository Monitor Thread Start", getName());
}
-std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim, bool append) {
- logger_->log_info("enter write for %s", claim->getContentFullPath());
+std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const minifi::ResourceClaim &claim, bool append) {
+ logger_->log_info("enter write for %s", claim.getContentFullPath());
{
std::lock_guard<std::mutex> lock(map_mutex_);
- auto claim_check = master_list_.find(claim->getContentFullPath());
+ auto claim_check = master_list_.find(claim.getContentFullPath());
if (claim_check != master_list_.end()) {
logger_->log_info("Creating copy of atomic entry");
auto ent = claim_check->second->takeOwnership();
if (ent == nullptr) {
return nullptr;
}
- return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent);
+ return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), ent);
}
}
int size = 0;
if (LIKELY(minimize_locking_ == true)) {
for (auto ent : value_vector_) {
- if (ent->testAndSetKey(claim, nullptr, nullptr, resource_claim_comparator_)) {
+ if (ent->testAndSetKey(claim.getContentFullPath())) {
std::lock_guard<std::mutex> lock(map_mutex_);
- master_list_[claim->getContentFullPath()] = ent;
- logger_->log_info("Minimize locking, return stream for %s", claim->getContentFullPath());
- return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent);
+ master_list_[claim.getContentFullPath()] = ent;
+ logger_->log_info("Minimize locking, return stream for %s", claim.getContentFullPath());
+ return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), ent);
}
size++;
}
} else {
std::lock_guard<std::mutex> lock(map_mutex_);
- auto claim_check = master_list_.find(claim->getContentFullPath());
+ auto claim_check = master_list_.find(claim.getContentFullPath());
if (claim_check != master_list_.end()) {
- return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, claim_check->second);
+ return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), claim_check->second);
} else {
- AtomicEntry<std::shared_ptr<minifi::ResourceClaim>> *ent = new AtomicEntry<std::shared_ptr<minifi::ResourceClaim>>(¤t_size_, &max_size_);
- if (ent->testAndSetKey(claim, nullptr, nullptr, resource_claim_comparator_)) {
- master_list_[claim->getContentFullPath()] = ent;
- return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent);
+ auto *ent = new AtomicEntry<ResourceClaim::Path>(¤t_size_, &max_size_);
+ if (ent->testAndSetKey(claim.getContentFullPath())) {
+ master_list_[claim.getContentFullPath()] = ent;
+ return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), ent);
}
}
}
- logger_->log_info("Cannot write %s %d, returning nullptr to roll back session. Repo is either full or locked", claim->getContentFullPath(), size);
+ logger_->log_info("Cannot write %s %d, returning nullptr to roll back session. Repo is either full or locked", claim.getContentFullPath(), size);
return nullptr;
}
-bool VolatileContentRepository::exists(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+bool VolatileContentRepository::exists(const minifi::ResourceClaim &claim) {
std::lock_guard<std::mutex> lock(map_mutex_);
- auto claim_check = master_list_.find(claim->getContentFullPath());
+ auto claim_check = master_list_.find(claim.getContentFullPath());
if (claim_check != master_list_.end()) {
auto ent = claim_check->second->takeOwnership();
if (ent == nullptr) {
@@ -146,53 +135,53 @@
return false;
}
-std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const minifi::ResourceClaim &claim) {
std::lock_guard<std::mutex> lock(map_mutex_);
- auto claim_check = master_list_.find(claim->getContentFullPath());
+ auto claim_check = master_list_.find(claim.getContentFullPath());
if (claim_check != master_list_.end()) {
auto ent = claim_check->second->takeOwnership();
if (ent == nullptr) {
return nullptr;
}
- return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent);
+ return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), ent);
}
return nullptr;
}
-bool VolatileContentRepository::remove(const std::shared_ptr<minifi::ResourceClaim> &claim) {
+bool VolatileContentRepository::remove(const minifi::ResourceClaim &claim) {
if (LIKELY(minimize_locking_ == true)) {
std::lock_guard<std::mutex> lock(map_mutex_);
- auto ent = master_list_.find(claim->getContentFullPath());
+ auto ent = master_list_.find(claim.getContentFullPath());
if (ent != master_list_.end()) {
auto ptr = ent->second;
// if we cannot remove the entry we will let the owner's destructor
// decrement the reference count and free it
- master_list_.erase(claim->getContentFullPath());
+ master_list_.erase(claim.getContentFullPath());
// because of the test and set we need to decrement ownership
ptr->decrementOwnership();
- if (ptr->freeValue(claim)) {
- logger_->log_info("Removed %s", claim->getContentFullPath());
+ if (ptr->freeValue(claim.getContentFullPath())) {
+ logger_->log_info("Deleting resource %s", claim.getContentFullPath());
return true;
} else {
- logger_->log_info("free failed for %s", claim->getContentFullPath());
+ logger_->log_info("free failed for %s", claim.getContentFullPath());
}
} else {
- logger_->log_info("Could not remove %s", claim->getContentFullPath());
+ logger_->log_info("Could not remove %s", claim.getContentFullPath());
}
} else {
std::lock_guard<std::mutex> lock(map_mutex_);
- auto claim_item = master_list_.find(claim->getContentFullPath());
+ auto claim_item = master_list_.find(claim.getContentFullPath());
if (claim_item != master_list_.end()) {
auto size = claim_item->second->getLength();
delete claim_item->second;
- master_list_.erase(claim->getContentFullPath());
+ master_list_.erase(claim.getContentFullPath());
current_size_ -= size;
}
return true;
}
- logger_->log_info("Could not remove %s, may not exist", claim->getContentFullPath());
+ logger_->log_info("Could not remove %s, may not exist", claim.getContentFullPath());
return false;
}
diff --git a/libminifi/src/utils/Id.cpp b/libminifi/src/utils/Id.cpp
index 79c8f7d..c90a39d 100644
--- a/libminifi/src/utils/Id.cpp
+++ b/libminifi/src/utils/Id.cpp
@@ -169,13 +169,6 @@
converted_ = uuidStr;
}
-uint64_t timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
-
-NonRepeatingStringGenerator::NonRepeatingStringGenerator()
- : prefix_((std::to_string(timestamp) + "-")),
- incrementor_(0) {
-}
-
IdGenerator::IdGenerator()
: implementation_(UUID_TIME_IMPL),
logger_(logging::LoggerFactory<IdGenerator>::getLogger()),
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp
index 4844729..1ba607b 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -33,12 +33,13 @@
#include "../TestBase.h"
#include "../../extensions/libarchive/MergeContent.h"
#include "../test/BufferReader.h"
+#include "core/repository/VolatileFlowFileRepository.h"
using Connection = minifi::Connection;
using MergeContent = minifi::processors::MergeContent;
struct TestFlow{
- TestFlow(const std::shared_ptr<core::repository::FlowFileRepository>& ff_repository, const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<core::Repository>& prov_repo,
+ TestFlow(const std::shared_ptr<core::Repository>& ff_repository, const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<core::Repository>& prov_repo,
const std::function<std::shared_ptr<core::Processor>(utils::Identifier&)>& processorGenerator, const core::Relationship& relationshipToOutput)
: ff_repository(ff_repository), content_repo(content_repo), prov_repo(prov_repo) {
// setup processor
@@ -119,7 +120,7 @@
std::shared_ptr<core::Processor> inputProcessor;
std::shared_ptr<core::Processor> processor;
- std::shared_ptr<core::repository::FlowFileRepository> ff_repository;
+ std::shared_ptr<core::Repository> ff_repository;
std::shared_ptr<core::ContentRepository> content_repo;
std::shared_ptr<core::Repository> prov_repo;
std::shared_ptr<core::ProcessContext> inputContext;
@@ -244,8 +245,11 @@
TestController testController;
LogTestController::getInstance().setDebug<core::ContentRepository>();
LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+ LogTestController::getInstance().setTrace<core::repository::VolatileContentRepository>();
LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
+ LogTestController::getInstance().setTrace<core::repository::FlowFileRepository>();
+ LogTestController::getInstance().setTrace<core::repository::VolatileRepository<minifi::ResourceClaim::Path>>();
char format[] = "/var/tmp/test.XXXXXX";
auto dir = testController.createTempDirectory(format);
@@ -255,8 +259,16 @@
config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestRepository>();
- std::shared_ptr<core::repository::FlowFileRepository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
+ std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
+ std::shared_ptr<core::ContentRepository> content_repo;
+ SECTION("VolatileContentRepository") {
+ testController.getLogger()->log_info("Using VolatileContentRepository");
+ content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ }
+ SECTION("FileSystemContentRepository") {
+ testController.getLogger()->log_info("Using FileSystemRepository");
+ content_repo = std::make_shared<core::repository::FileSystemRepository>();
+ }
ff_repository->initialize(config);
content_repo->initialize(config);
@@ -269,23 +281,29 @@
flowController->load(flow.root);
ff_repository->start();
- // write two files into the input
- auto flowFile = flow.write("data");
- auto claim = flowFile->getResourceClaim();
- // one from the FlowFile and one from the persisted instance
- REQUIRE(claim->getFlowFileRecordOwnedCount() == 2);
- // update them with the Merge Processor
- flow.trigger();
+ std::string removedResource;
+ {
+ // write two files into the input
+ auto flowFile = flow.write("data");
+ auto claim = flowFile->getResourceClaim();
+ removedResource = claim->getContentFullPath();
+ // one from the FlowFile and one from the persisted instance
+ REQUIRE(claim->getFlowFileRecordOwnedCount() == 2);
+ // update them with the Merge Processor
+ flow.trigger();
- auto content = flow.read(flowFile);
- REQUIRE(content == "<override>");
- auto newClaim = flowFile->getResourceClaim();
- // the processor added new content to the flowFile
- REQUIRE(claim != newClaim);
- // nobody holds an owning reference to the previous claim
- REQUIRE(claim->getFlowFileRecordOwnedCount() == 0);
- // one from the FlowFile and one from the persisted instance
- REQUIRE(newClaim->getFlowFileRecordOwnedCount() == 2);
+ auto content = flow.read(flowFile);
+ REQUIRE(content == "<override>");
+ auto newClaim = flowFile->getResourceClaim();
+ // the processor added new content to the flowFile
+ REQUIRE(claim != newClaim);
+ // only this instance behind this shared_ptr keeps the resource alive
+ REQUIRE(claim->getFlowFileRecordOwnedCount() == 1);
+ // one from the FlowFile and one from the persisted instance
+ REQUIRE(newClaim->getFlowFileRecordOwnedCount() == 2);
+ }
+ REQUIRE(LogTestController::getInstance().countOccurrences("Deleting resource " + removedResource) == 1);
+ REQUIRE(LogTestController::getInstance().countOccurrences("Deleting resource") == 1);
ff_repository->stop();
flowController->unload();
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index 89d9bca..a6717f7 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -39,7 +39,7 @@
auto claim = std::make_shared<minifi::ResourceClaim>(content_repo);
- auto stream = content_repo->write(claim);
+ auto stream = content_repo->write(*claim);
stream->writeUTF("well hello there");
@@ -55,7 +55,7 @@
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
REQUIRE(content_repo->initialize(configuration));
- auto read_stream = content_repo->read(claim);
+ auto read_stream = content_repo->read(*claim);
std::string readstr;
read_stream->readUTF(readstr);
@@ -80,7 +80,7 @@
auto claim = std::make_shared<minifi::ResourceClaim>(content_repo);
- auto stream = content_repo->write(claim);
+ auto stream = content_repo->write(*claim);
stream->writeUTF("well hello there");
@@ -97,9 +97,9 @@
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
REQUIRE(content_repo->initialize(configuration));
- content_repo->remove(claim);
+ content_repo->remove(*claim);
- auto read_stream = content_repo->read(claim);
+ auto read_stream = content_repo->read(*claim);
std::string readstr;
@@ -118,7 +118,7 @@
REQUIRE(content_repo->initialize(configuration));
auto claim = std::make_shared<minifi::ResourceClaim>(content_repo);
- auto stream = content_repo->write(claim);
+ auto stream = content_repo->write(*claim);
// we're writing nothing to the stream.
@@ -135,7 +135,7 @@
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
REQUIRE(content_repo->initialize(configuration));
- auto read_stream = content_repo->read(claim);
+ auto read_stream = content_repo->read(*claim);
std::string readstr;
@@ -143,68 +143,7 @@
REQUIRE(read_stream->readUTF(readstr) == -1);
}
-TEST_CASE("Test Null Claim", "[TestDBCR4]") {
- TestController testController;
- char format[] = "/var/tmp/testRepo.XXXXXX";
- auto dir = testController.createTempDirectory(format);
- auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
-
- auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
- configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
- REQUIRE(content_repo->initialize(configuration));
-
-
- auto claim = std::make_shared<minifi::ResourceClaim>(content_repo);
- auto stream = content_repo->write(nullptr);
-
- REQUIRE(stream == nullptr);
-
- auto read_stream = content_repo->write(nullptr);
-
- REQUIRE(read_stream == nullptr);
-}
-
-TEST_CASE("Delete Null Claim", "[TestDBCR5]") {
- TestController testController;
- char format[] = "/var/tmp/testRepo.XXXXXX";
- auto dir = testController.createTempDirectory(format);
- auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
-
- auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
- configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
- REQUIRE(content_repo->initialize(configuration));
-
- auto claim = std::make_shared<minifi::ResourceClaim>(content_repo);
- auto stream = content_repo->write(claim);
-
- stream->writeUTF("well hello there");
-
- stream->closeStream();
-
- content_repo->stop();
-
- // reclaim the memory
- content_repo = nullptr;
-
- content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
-
- configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
- configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir);
- REQUIRE(content_repo->initialize(configuration));
-
- REQUIRE(!content_repo->remove(nullptr));
-
- auto read_stream = content_repo->read(claim);
-
- std::string readstr;
-
- // -1 tell us we have an invalid stream
- read_stream->readUTF(readstr);
-
- REQUIRE(readstr == "well hello there");
-}
-
-TEST_CASE("Delete NonExistent Claim", "[TestDBCR5]") {
+TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") {
TestController testController;
char format[] = "/var/tmp/testRepo.XXXXXX";
auto dir = testController.createTempDirectory(format);
@@ -216,7 +155,7 @@
auto claim = std::make_shared<minifi::ResourceClaim>(content_repo);
auto claim2 = std::make_shared<minifi::ResourceClaim>(content_repo);
- auto stream = content_repo->write(claim);
+ auto stream = content_repo->write(*claim);
stream->writeUTF("well hello there");
@@ -234,9 +173,9 @@
REQUIRE(content_repo->initialize(configuration));
// we won't complain if it does not exist
- REQUIRE(content_repo->remove(claim2));
+ REQUIRE(content_repo->remove(*claim2));
- auto read_stream = content_repo->read(claim);
+ auto read_stream = content_repo->read(*claim);
std::string readstr;
@@ -246,7 +185,7 @@
REQUIRE(readstr == "well hello there");
}
-TEST_CASE("Delete Remove Count Claim", "[TestDBCR6]") {
+TEST_CASE("Delete Remove Count Claim", "[TestDBCR5]") {
TestController testController;
char format[] = "/var/tmp/testRepo.XXXXXX";
auto dir = testController.createTempDirectory(format);
@@ -258,7 +197,7 @@
auto claim = std::make_shared<minifi::ResourceClaim>(content_repo);
auto claim2 = std::make_shared<minifi::ResourceClaim>(content_repo);
- auto stream = content_repo->write(claim);
+ auto stream = content_repo->write(*claim);
stream->writeUTF("well hello there");
@@ -278,14 +217,13 @@
// increment twice. verify we have 2 for the stream count
// and then test the removal and verify that the claim was removed by virtue of obtaining
// its count.
- content_repo->incrementStreamCount(claim2);
- content_repo->incrementStreamCount(claim2);
- REQUIRE(content_repo->getStreamCount(claim2) == 2);
- content_repo->decrementStreamCount(claim2);
- content_repo->decrementStreamCount(claim2);
- REQUIRE(content_repo->removeIfOrphaned(claim2));
- REQUIRE(content_repo->getStreamCount(claim2) == 0);
- auto read_stream = content_repo->read(claim);
+ content_repo->incrementStreamCount(*claim2);
+ content_repo->incrementStreamCount(*claim2);
+ REQUIRE(content_repo->getStreamCount(*claim2) == 2);
+ content_repo->decrementStreamCount(*claim2);
+ REQUIRE(content_repo->decrementStreamCount(*claim2) == core::StreamManager<minifi::ResourceClaim>::StreamState::Deleted);
+ REQUIRE(content_repo->getStreamCount(*claim2) == 0);
+ auto read_stream = content_repo->read(*claim);
std::string readstr;
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp
index aa2ad84..fefea64 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -162,9 +162,10 @@
repository->loadComponent(content_repo);
- std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ss.str(), content_repo);
{
+ std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ss.str(), content_repo);
+
minifi::FlowFileRecord record(repository, content_repo, attributes, claim);
record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd");
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index 2fc2e7e..db65427 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -343,7 +343,7 @@
auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
if((*content_repo_ptr) && (ff->keepContent == 0)) {
auto claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo_ptr);
- (*content_repo_ptr)->remove(claim);
+ (*content_repo_ptr)->remove(*claim);
}
delete content_repo_ptr;
}
@@ -451,7 +451,7 @@
if(ff->crp && (*content_repo)) {
std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation,
*content_repo);
- auto stream = (*content_repo)->read(claim);
+ auto stream = (*content_repo)->read(*claim);
return stream->read(target, size);
} else {
file_buffer fb = file_to_buffer(ff->contentLocation);
@@ -699,7 +699,7 @@
if(input_ff->crp && (*content_repo)) {
std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(input_ff->contentLocation,
*content_repo);
- ff_data->content_stream = (*content_repo)->read(claim);
+ ff_data->content_stream = (*content_repo)->read(*claim);
} else {
ff_data->content_stream = std::make_shared<minifi::io::DataStream>();
file_buffer fb = file_to_buffer(input_ff->contentLocation);