MINIFICPP-2394 fix LogAttribute AttributesToLog and AttributesToIgnor…
Closes #1810
Signed-off-by: Marton Szasz <szaszm@apache.org>
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 9dd1230..930dd35 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -1844,16 +1844,16 @@
In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
-| Name | Default Value | Allowable Values | Description |
-|-----------------------------|---------------|---------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| Log Level | | info<br/>trace<br/>error<br/>warn<br/>debug | The Log Level to use when logging the Attributes |
-| Attributes to Log | | | A comma-separated list of Attributes to Log. If not specified, all attributes will be logged. |
-| Attributes to Ignore | | | A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored. |
-| Log Payload | false | true<br/>false | If true, the FlowFile's payload will be logged, in addition to its attributes. Otherwise, just the Attributes will be logged. |
-| Hexencode Payload | false | true<br/>false | If true, the FlowFile's payload will be logged in a hexencoded format |
-| Maximum Payload Line Length | 0 | | The logged payload will be broken into lines this long. 0 means no newlines will be added. |
-| Log Prefix | | | Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors. |
-| FlowFiles To Log | 1 | | Number of flow files to log. If set to zero all flow files will be logged. Please note that this may block other threads from running if not used judiciously. |
+| Name | Default Value | Allowable Values | Description |
+|-----------------------------|---------------|------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Log Level | | trace<br>debug<br>info<br>warn<br>error<br>critical | The Log Level to use when logging the Attributes |
+| Attributes to Log | | | A comma-separated list of Attributes to Log. If not specified, all attributes will be logged. |
+| Attributes to Ignore | | | A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored. |
+| Log Payload | false | true<br/>false | If true, the FlowFile's payload will be logged, in addition to its attributes. Otherwise, just the Attributes will be logged. |
+| Hexencode Payload | false | true<br/>false | If true, the FlowFile's payload will be logged in a hexencoded format |
+| Maximum Payload Line Length | 0 | | The logged payload will be broken into lines this long. 0 means no newlines will be added. |
+| Log Prefix | | | Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors. |
+| FlowFiles To Log | 1 | | Number of flow files to log. If set to zero all flow files will be logged. Please note that this may block other threads from running if not used judiciously. |
### Relationships
diff --git a/extensions/standard-processors/processors/LogAttribute.cpp b/extensions/standard-processors/processors/LogAttribute.cpp
index 78abb08..61d042a 100644
--- a/extensions/standard-processors/processors/LogAttribute.cpp
+++ b/extensions/standard-processors/processors/LogAttribute.cpp
@@ -18,7 +18,6 @@
* limitations under the License.
*/
#include "LogAttribute.h"
-#include <ctime>
#include <cstring>
#include <memory>
#include <string>
@@ -27,11 +26,13 @@
#include <map>
#include <sstream>
#include <iostream>
-#include "utils/TimeUtil.h"
-#include "utils/StringUtils.h"
+
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "utils/ProcessorConfigUtils.h"
namespace org::apache::nifi::minifi::processors {
@@ -48,98 +49,93 @@
context.getProperty(MaxPayloadLineLength, max_line_length_);
logger_->log_debug("Maximum Payload Line Length: {}", max_line_length_);
-}
-// OnTrigger method, implemented by NiFi LogAttribute
-void LogAttribute::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
- logger_->log_trace("enter log attribute, attempting to retrieve {} flow files", flowfiles_to_log_);
- std::string dashLine = "--------------------------------------------------";
- LogAttrLevel level = LogAttrLevelInfo;
- bool logPayload = false;
- uint64_t i = 0;
- const auto max = flowfiles_to_log_ == 0 ? UINT64_MAX : flowfiles_to_log_;
- for (; i < max; ++i) {
+ if (auto attributes_to_log_str = context.getProperty(AttributesToLog); attributes_to_log_str && !attributes_to_log_str->empty()) {
+ if (auto attrs_to_log_vec = utils::string::split(*attributes_to_log_str, ","); !attrs_to_log_vec.empty())
+ attributes_to_log_.emplace(std::make_move_iterator(attrs_to_log_vec.begin()), std::make_move_iterator(attrs_to_log_vec.end()));
+ }
+
+ if (auto attributes_to_ignore_str = context.getProperty(AttributesToIgnore); attributes_to_ignore_str && !attributes_to_ignore_str->empty()) {
+ if (auto attrs_to_ignore_vec = utils::string::split(*attributes_to_ignore_str, ","); !attrs_to_ignore_vec.empty())
+ attributes_to_ignore_.emplace(std::make_move_iterator(attrs_to_ignore_vec.begin()), std::make_move_iterator(attrs_to_ignore_vec.end()));
+ }
+
+ if (auto log_level_str = context.getProperty(LogLevel)) {
+ if (auto result = magic_enum::enum_cast<core::logging::LOG_LEVEL>(*log_level_str)) {
+ log_level_ = *result;
+ } else if (*log_level_str == "error") { // TODO(MINIFICPP-2294) this could be avoided if config files were properly migrated
+ log_level_ = core::logging::err;
+ }
+ }
+
+ if (auto log_prefix = context.getProperty(LogPrefix); log_prefix && !log_prefix->empty()) {
+ dash_line_ = fmt::format("{:-^50}", *log_prefix);
+ }
+
+ log_payload_ = context.getProperty<bool>(LogPayload).value_or(false);
+}
+
+std::string LogAttribute::generateLogMessage(core::ProcessSession& session, const std::shared_ptr<core::FlowFile>& flow_file) const {
+ std::ostringstream message;
+ message << "Logging for flow file" << "\n";
+ message << dash_line_;
+ message << "\nStandard FlowFile Attributes";
+ message << "\n" << "UUID:" << flow_file->getUUIDStr();
+ message << "\n" << "EntryDate:" << utils::timeutils::getTimeStr(flow_file->getEntryDate());
+ message << "\n" << "lineageStartDate:" << utils::timeutils::getTimeStr(flow_file->getlineageStartDate());
+ message << "\n" << "Size:" << flow_file->getSize() << " Offset:" << flow_file->getOffset();
+ message << "\nFlowFile Attributes Map Content";
+ for (const auto& [attr_key, attr_value] : flow_file->getAttributes()) {
+ if (attributes_to_ignore_ && attributes_to_ignore_->contains(attr_key))
+ continue;
+ if (attributes_to_log_ && !attributes_to_log_->contains(attr_key))
+ continue;
+ message << "\n" << "key:" << attr_key << " value:" << attr_value;
+ }
+ message << "\nFlowFile Resource Claim Content";
+ if (const auto claim = flow_file->getResourceClaim()) {
+ message << "\n" << "Content Claim:" << claim->getContentFullPath();
+ }
+ if (log_payload_ && flow_file->getSize() <= 1024 * 1024) {
+ message << "\n" << "Payload:" << "\n";
+ const auto read_result = session.readBuffer(flow_file);
+
+ std::string printable_payload;
+ if (hexencode_) {
+ printable_payload = utils::string::to_hex(read_result.buffer);
+ } else {
+ printable_payload = to_string(read_result);
+ }
+
+ if (max_line_length_ == 0U) {
+ message << printable_payload << "\n";
+ } else {
+ for (size_t j = 0; j < printable_payload.size(); j += max_line_length_) {
+ message << printable_payload.substr(j, max_line_length_) << '\n';
+ }
+ }
+ } else {
+ message << "\n";
+ }
+ message << dash_line_;
+ return message.str();
+}
+
+void LogAttribute::onTrigger(core::ProcessContext&, core::ProcessSession& session) {
+ logger_->log_trace("enter log attribute, attempting to retrieve {} flow files", flowfiles_to_log_);
+ const auto max_flow_files_to_process = flowfiles_to_log_ == 0 ? UINT64_MAX : flowfiles_to_log_;
+ uint64_t flow_files_processed = 0;
+ for (; flow_files_processed < max_flow_files_to_process; ++flow_files_processed) {
std::shared_ptr<core::FlowFile> flow = session.get();
if (!flow) {
break;
}
- std::string value;
- if (context.getProperty(LogLevel, value)) {
- logLevelStringToEnum(value, level);
- }
- if (context.getProperty(LogPrefix, value)) {
- dashLine = "-----" + value + "-----";
- }
-
- context.getProperty(LogPayload, logPayload);
-
- std::ostringstream message;
- message << "Logging for flow file " << "\n";
- message << dashLine;
- message << "\nStandard FlowFile Attributes";
- message << "\n" << "UUID:" << flow->getUUIDStr();
- message << "\n" << "EntryDate:" << utils::timeutils::getTimeStr(flow->getEntryDate());
- message << "\n" << "lineageStartDate:" << utils::timeutils::getTimeStr(flow->getlineageStartDate());
- message << "\n" << "Size:" << flow->getSize() << " Offset:" << flow->getOffset();
- message << "\nFlowFile Attributes Map Content";
- std::map<std::string, std::string> attrs = flow->getAttributes();
- std::map<std::string, std::string>::iterator it;
- for (it = attrs.begin(); it != attrs.end(); it++) {
- message << "\n" << "key:" << it->first << " value:" << it->second;
- }
- message << "\nFlowFile Resource Claim Content";
- std::shared_ptr<ResourceClaim> claim = flow->getResourceClaim();
- if (claim) {
- message << "\n" << "Content Claim:" << claim->getContentFullPath();
- }
- if (logPayload && flow->getSize() <= 1024 * 1024) {
- message << "\n" << "Payload:" << "\n";
- const auto read_result = session.readBuffer(flow);
-
- std::string printable_payload;
- if (hexencode_) {
- printable_payload = utils::string::to_hex(read_result.buffer);
- } else {
- printable_payload = to_string(read_result);
- }
-
- if (max_line_length_ == 0U) {
- message << printable_payload << "\n";
- } else {
- for (size_t i = 0; i < printable_payload.size(); i += max_line_length_) {
- message << printable_payload.substr(i, max_line_length_) << '\n';
- }
- }
- } else {
- message << "\n";
- }
- message << dashLine;
- std::string output = message.str();
-
- switch (level) {
- case LogAttrLevelInfo:
- logger_->log_info("{}", output);
- break;
- case LogAttrLevelDebug:
- logger_->log_debug("{}", output);
- break;
- case LogAttrLevelError:
- logger_->log_error("{}", output);
- break;
- case LogAttrLevelTrace:
- logger_->log_trace("{}", output);
- break;
- case LogAttrLevelWarn:
- logger_->log_warn("{}", output);
- break;
- default:
- break;
- }
+ logger_->log_with_level(log_level_, "{}", generateLogMessage(session, flow));
session.transfer(flow, Success);
}
- logger_->log_debug("Logged {} flow files", i);
+ logger_->log_debug("Logged {} flow files", flow_files_processed);
}
REGISTER_RESOURCE(LogAttribute, Processor);
diff --git a/extensions/standard-processors/processors/LogAttribute.h b/extensions/standard-processors/processors/LogAttribute.h
index 3eb1ef7..a14c546 100644
--- a/extensions/standard-processors/processors/LogAttribute.h
+++ b/extensions/standard-processors/processors/LogAttribute.h
@@ -33,14 +33,13 @@
#include "core/PropertyType.h"
#include "core/RelationshipDefinition.h"
#include "core/logging/LoggerConfiguration.h"
-#include "utils/gsl.h"
#include "utils/Export.h"
namespace org::apache::nifi::minifi::processors {
class LogAttribute : public core::Processor {
public:
- explicit LogAttribute(std::string_view name, const utils::Identifier& uuid = {})
+ explicit LogAttribute(const std::string_view name, const utils::Identifier& uuid = {})
: Processor(name, uuid) {
logger_->set_max_log_size(-1);
}
@@ -48,9 +47,9 @@
EXTENSIONAPI static constexpr const char* Description = "Logs attributes of flow files in the MiNiFi application log.";
- EXTENSIONAPI static constexpr auto LogLevel = core::PropertyDefinitionBuilder<5>::createProperty("Log Level")
+ EXTENSIONAPI static constexpr auto LogLevel = core::PropertyDefinitionBuilder<6>::createProperty("Log Level")
.withDescription("The Log Level to use when logging the Attributes")
- .withAllowedValues({"info", "trace", "error", "warn", "debug"})
+ .withAllowedValues({"trace", "debug", "info", "warn", "error", "critical"})
.build();
EXTENSIONAPI static constexpr auto AttributesToLog = core::PropertyDefinitionBuilder<>::createProperty("Attributes to Log")
.withDescription("A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.")
@@ -103,44 +102,22 @@
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
- enum LogAttrLevel {
- LogAttrLevelTrace,
- LogAttrLevelDebug,
- LogAttrLevelInfo,
- LogAttrLevelWarn,
- LogAttrLevelError
- };
-
- static bool logLevelStringToEnum(const std::string &logStr, LogAttrLevel &level) {
- if (logStr == "trace") {
- level = LogAttrLevelTrace;
- return true;
- } else if (logStr == "debug") {
- level = LogAttrLevelDebug;
- return true;
- } else if (logStr == "info") {
- level = LogAttrLevelInfo;
- return true;
- } else if (logStr == "warn") {
- level = LogAttrLevelWarn;
- return true;
- } else if (logStr == "error") {
- level = LogAttrLevelError;
- return true;
- } else {
- return false;
- }
- }
-
void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override;
void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;
void initialize() override;
private:
+ std::string generateLogMessage(core::ProcessSession& session, const std::shared_ptr<core::FlowFile>& flow_file) const;
+
uint64_t flowfiles_to_log_{1};
bool hexencode_{false};
uint32_t max_line_length_{80};
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<LogAttribute>::getLogger(uuid_);
+ core::logging::LOG_LEVEL log_level_{core::logging::LOG_LEVEL::info};
+ std::string dash_line_ = "--------------------------------------------------";
+ bool log_payload_ = false;
+ std::optional<std::unordered_set<std::string>> attributes_to_log_;
+ std::optional<std::unordered_set<std::string>> attributes_to_ignore_;
};
} // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/ExtractTextTests.cpp b/extensions/standard-processors/tests/unit/ExtractTextTests.cpp
index 197f7d3..c99a10f 100644
--- a/extensions/standard-processors/tests/unit/ExtractTextTests.cpp
+++ b/extensions/standard-processors/tests/unit/ExtractTextTests.cpp
@@ -77,7 +77,6 @@
plan->setProperty(maprocessor, org::apache::nifi::minifi::processors::ExtractText::Attribute, TEST_ATTR);
std::shared_ptr<core::Processor> laprocessor = plan->addProcessor("LogAttribute", "outputLogAttribute", core::Relationship("success", "description"), true);
- plan->setProperty(laprocessor, org::apache::nifi::minifi::processors::LogAttribute::AttributesToLog, TEST_ATTR);
auto test_file_path = temp_dir / TEST_FILE;
@@ -146,7 +145,6 @@
plan->setDynamicProperty(maprocessor, "InvalidRegex", "[Invalid)A(F)");
std::shared_ptr<core::Processor> laprocessor = plan->addProcessor("LogAttribute", "outputLogAttribute", core::Relationship("success", "description"), true);
- plan->setProperty(laprocessor, org::apache::nifi::minifi::processors::LogAttribute::AttributesToLog, TEST_ATTR);
auto test_file_path = dir / TEST_FILE;
@@ -214,7 +212,6 @@
plan->setDynamicProperty(extract_text_processor, "RegexAttr", "Speed limit (.*)");
auto log_attribute_processor = plan->addProcessor("LogAttribute", "outputLogAttribute", core::Relationship("success", "description"), true);
- plan->setProperty(log_attribute_processor, org::apache::nifi::minifi::processors::LogAttribute::AttributesToLog, TEST_ATTR);
std::string additional_long_string(100'000, '.');
minifi::test::utils::putFileToDir(dir, TEST_FILE, "Speed limit 80" + additional_long_string);
diff --git a/extensions/standard-processors/tests/unit/LogAttributeTests.cpp b/extensions/standard-processors/tests/unit/LogAttributeTests.cpp
new file mode 100644
index 0000000..4a3045a
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/LogAttributeTests.cpp
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+#include <catch2/generators/catch_generators.hpp>
+
+#include "unit/Catch.h"
+#include "processors/LogAttribute.h"
+#include "unit/SingleProcessorTestController.h"
+#include "unit/TestUtils.h"
+
+using LogAttribute = org::apache::nifi::minifi::processors::LogAttribute;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+TEST_CASE("LogAttribute logs payload", "[LogAttribute]") {
+ const auto log_attribute = std::make_shared<LogAttribute>("log_attribute");
+ SingleProcessorTestController controller{log_attribute};
+ LogTestController::getInstance().setTrace<LogAttribute>();
+
+ const auto [hexencode_payload, expected_payload] = GENERATE(
+ std::make_tuple("false", "hello world"),
+ std::make_tuple("true", "68656c6c6f20776f726c64"));
+
+ REQUIRE(controller.plan->setProperty(log_attribute, LogAttribute::LogPayload, "true"));
+ REQUIRE(controller.plan->setProperty(log_attribute, LogAttribute::HexencodePayload, hexencode_payload));
+
+ controller.plan->scheduleProcessor(log_attribute);
+ const auto result = controller.trigger("hello world", {{"eng", "apple"}, {"ger", "Apfel"}, {"fra", "pomme"}});
+ CHECK(result.at(LogAttribute::Success).size() == 1);
+ CHECK(LogTestController::getInstance().contains("--------------------------------------------------", 1s));
+ CHECK(LogTestController::getInstance().contains("Size:11 Offset:0", 0s));
+ CHECK(LogTestController::getInstance().contains("FlowFile Attributes Map Content", 0s));
+ CHECK(LogTestController::getInstance().contains("key:eng value:apple", 0s));
+ CHECK(LogTestController::getInstance().contains("key:ger value:Apfel", 0s));
+ CHECK(LogTestController::getInstance().contains("key:fra value:pomme", 0s));
+
+ CHECK(LogTestController::getInstance().contains(fmt::format("Payload:\n{}", expected_payload), 0s));
+}
+
+TEST_CASE("LogAttribute LogLevel and LogPrefix", "[LogAttribute]") {
+ const auto log_attribute = std::make_shared<LogAttribute>("log_attribute");
+ SingleProcessorTestController controller{log_attribute};
+ LogTestController::getInstance().setTrace<LogAttribute>();
+
+ const auto [log_level, log_prefix, expected_dash] = GENERATE(
+ std::make_tuple("info", "", "--------------------------------------------------"),
+ std::make_tuple("critical", "foo", "-----------------------foo------------------------"),
+ std::make_tuple("debug", "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Morbi mollis neque sit amet dui pretium sodales.",
+ "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Morbi mollis neque sit amet dui pretium sodales."),
+ std::make_tuple("error", "", "--------------------------------------------------"));
+
+ REQUIRE(controller.plan->setProperty(log_attribute, LogAttribute::LogLevel, log_level));
+ REQUIRE(controller.plan->setProperty(log_attribute, LogAttribute::LogPrefix, log_prefix));
+
+ controller.plan->scheduleProcessor(log_attribute);
+ const auto result = controller.trigger("hello world", {{"eng", "apple"}, {"ger", "Apfel"}, {"fra", "pomme"}});
+ CHECK(result.at(LogAttribute::Success).size() == 1);
+ CHECK(LogTestController::getInstance().contains(fmt::format("[org::apache::nifi::minifi::processors::LogAttribute] [{}] Logging for flow file\n{}", log_level, expected_dash), 1s));
+ CHECK(LogTestController::getInstance().contains("key:fra value:pomme", 0s));
+ CHECK(LogTestController::getInstance().contains("Size:11 Offset:0", 0s));
+ CHECK(LogTestController::getInstance().contains("FlowFile Attributes Map Content", 0s));
+ CHECK(LogTestController::getInstance().contains("key:eng value:apple", 0s));
+ CHECK(LogTestController::getInstance().contains("key:ger value:Apfel", 0s));
+ CHECK(LogTestController::getInstance().contains("key:fra value:pomme", 0s));
+}
+
+TEST_CASE("LogAttribute filtering attributes", "[LogAttribute]") {
+ const auto log_attribute = std::make_shared<LogAttribute>("log_attribute");
+ SingleProcessorTestController controller{log_attribute};
+ LogTestController::getInstance().setTrace<LogAttribute>();
+
+ auto attrs_to_log = "";
+ auto attrs_to_ignore = "";
+ auto expected_eng = true;
+ auto expected_ger = true;
+ auto expected_fra = true;
+
+ SECTION("Default") {
+ }
+
+ SECTION("Ignore eng and fra") {
+ attrs_to_ignore = "eng,fra";
+ expected_eng = false;
+ expected_fra = false;
+ }
+
+ SECTION("Log eng and fra") {
+ attrs_to_log = "eng,fra";
+ expected_ger = false;
+ }
+
+ SECTION("Log eng and fra, ignore fra") {
+ attrs_to_log = "eng,fra";
+ attrs_to_ignore = "fra";
+ expected_fra = false;
+ expected_ger = false;
+ }
+
+ REQUIRE(controller.plan->setProperty(log_attribute, LogAttribute::AttributesToLog, attrs_to_log));
+ REQUIRE(controller.plan->setProperty(log_attribute, LogAttribute::AttributesToIgnore, attrs_to_ignore));
+
+
+ controller.plan->scheduleProcessor(log_attribute);
+ const auto result = controller.trigger("hello world", {{"eng", "apple"}, {"ger", "Apfel"}, {"fra", "pomme"}});
+ CHECK(result.at(LogAttribute::Success).size() == 1);
+ CHECK(LogTestController::getInstance().contains("--------------------------------------------------", 1s));
+ CHECK(LogTestController::getInstance().contains("Size:11 Offset:0", 0s));
+ CHECK(LogTestController::getInstance().contains("FlowFile Attributes Map Content", 0s));
+ CHECK(LogTestController::getInstance().contains("key:eng value:apple", 0s) == expected_eng);
+ CHECK(LogTestController::getInstance().contains("key:ger value:Apfel", 0s) == expected_ger);
+ CHECK(LogTestController::getInstance().contains("key:fra value:pomme", 0s) == expected_fra);
+}
+
+TEST_CASE("LogAttribute batch test", "[LogAttribute]") {
+ const auto log_attribute = std::make_shared<LogAttribute>("log_attribute");
+ SingleProcessorTestController controller{log_attribute};
+
+ const auto [flow_files_to_log, expected_success_flow_files] = GENERATE(
+ std::make_tuple("0", 3U),
+ std::make_tuple("1", 1U),
+ std::make_tuple("2", 2U));
+
+ REQUIRE(controller.plan->setProperty(log_attribute, LogAttribute::FlowFilesToLog, flow_files_to_log));
+
+ controller.plan->scheduleProcessor(log_attribute);
+ const auto results = controller.trigger({{"first", {{"foo_key", "first_value"}}}, {"second", {{"foo_key", "second_value"}}}, {"third", {{"foo_key", "third_value"}}}});
+ CHECK(results.at(LogAttribute::Success).size() == expected_success_flow_files);
+}
+} // namespace org::apache::nifi::minifi::test