MINIFICPP-1987 Configuring processor metrics with regular expressions
Signed-off-by: Ferenc Gerlits <fgerlits@gmail.com>
This closes #1459
diff --git a/C2.md b/C2.md
index 14c1709..71c40f1 100644
--- a/C2.md
+++ b/C2.md
@@ -117,6 +117,10 @@
nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name=ProcessorMetric
nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes=GetFileMetrics
+Processor metrics can also be configured using regular expressions with the `processorMetrics/` prefix, so the following definition is also valid:
+
+ nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes=processorMetrics/Get.*Metrics
+
This example shows a metrics sub tree defined by the option 'nifi.c2.root.class.definitions'.
This is a comma separated list of all sub trees. In the example, above, only one sub tree exists: metrics.
diff --git a/METRICS.md b/METRICS.md
index fa0732c..bf6ed91 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -59,13 +59,13 @@
# in minifi.properties
- nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation
+ nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation,processorMetrics/Tail.*
An agent identifier should also be defined to identify which agent the metric is exposed from. If not set, the hostname is used as the identifier.
- # in minifi.properties
+ # in minifi.properties
- nifi.metrics.publisher.agent.identifier=Agent1
+ nifi.metrics.publisher.agent.identifier=Agent1
## System Metrics
@@ -168,6 +168,16 @@
Processor level metrics can be accessed for any processor provided by MiNiFi. These metrics correspond to the name of the processor appended by the "Metrics" suffix (e.g. GetFileMetrics, TailFileMetrics, etc.).
+Besides configuring processor metrics directly, they can also be configured using regular expressions with the `processorMetrics/` prefix.
+
+All available processor metrics can be requested in the `minifi.properties` by using the following configuration:
+
+ nifi.metrics.publisher.metrics=processorMetrics/.*
+
+Regular expressions can also be used for requesting multiple processor metrics at once, like GetFileMetrics and GetTCPMetrics with the following configuration:
+
+ nifi.metrics.publisher.metrics=processorMetrics/Get.*Metrics
+
### General Metrics
There are general metrics that are available for all processors. Besides these metrics processors can implement additional metrics that are speicific to that processor.
diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/minifi/core/ImageStore.py
index c0c1588..20d212c 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -111,7 +111,7 @@
RUN echo nifi.metrics.publisher.agent.identifier=Agent1 >> {minifi_root}/conf/minifi.properties
RUN echo nifi.metrics.publisher.class=PrometheusMetricsPublisher >> {minifi_root}/conf/minifi.properties
RUN echo nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936 >> {minifi_root}/conf/minifi.properties
- RUN echo nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,GetFileMetrics,GetTCPMetrics,PutFileMetrics,FlowInformation,DeviceInfoNode,AgentStatus >> {minifi_root}/conf/minifi.properties
+ RUN echo nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,PutFileMetrics,processorMetrics/Get.*,FlowInformation,DeviceInfoNode,AgentStatus >> {minifi_root}/conf/minifi.properties
RUN echo nifi.c2.enable=true >> {minifi_root}/conf/minifi.properties
RUN echo nifi.c2.rest.url=http://minifi-c2-server:10090/c2/config/heartbeat >> {minifi_root}/conf/minifi.properties
RUN echo nifi.c2.rest.url.ack=http://minifi-c2-server:10090/c2/config/acknowledge >> {minifi_root}/conf/minifi.properties
diff --git a/extensions/http-curl/tests/C2DescribeMetricsTest.cpp b/extensions/http-curl/tests/C2DescribeMetricsTest.cpp
index acf5d80..0e177d5 100644
--- a/extensions/http-curl/tests/C2DescribeMetricsTest.cpp
+++ b/extensions/http-curl/tests/C2DescribeMetricsTest.cpp
@@ -66,10 +66,14 @@
void handleHeartbeat(const rapidjson::Document&, struct mg_connection* conn) override {
switch (state_) {
- case TestState::DESCRIBE_SPECIFIC_METRIC: {
+ case TestState::DESCRIBE_SPECIFIC_PROCESSOR_METRIC: {
sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn, {{"metricsClass", "GetFileMetrics"}});
break;
}
+ case TestState::DESCRIBE_SPECIFIC_SYSTEM_METRIC: {
+ sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn, {{"metricsClass", "QueueMetrics"}});
+ break;
+ }
case TestState::DESCRIBE_ALL_METRICS: {
sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn);
break;
@@ -81,8 +85,12 @@
void handleAcknowledge(const rapidjson::Document& root) override {
switch (state_) {
- case TestState::DESCRIBE_SPECIFIC_METRIC: {
- verifySpecificMetrics(root);
+ case TestState::DESCRIBE_SPECIFIC_PROCESSOR_METRIC: {
+ verifySpecificProcessorMetrics(root);
+ break;
+ }
+ case TestState::DESCRIBE_SPECIFIC_SYSTEM_METRIC: {
+ verifySpecificSystemMetrics(root);
break;
}
case TestState::DESCRIBE_ALL_METRICS: {
@@ -96,7 +104,8 @@
private:
enum class TestState {
- DESCRIBE_SPECIFIC_METRIC,
+ DESCRIBE_SPECIFIC_PROCESSOR_METRIC,
+ DESCRIBE_SPECIFIC_SYSTEM_METRIC,
DESCRIBE_ALL_METRICS
};
@@ -109,13 +118,22 @@
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
}
- void verifySpecificMetrics(const rapidjson::Document& root) {
+ void verifySpecificProcessorMetrics(const rapidjson::Document& root) {
auto getfile_metrics_verified =
!root.HasMember("metrics") &&
root.HasMember("GetFileMetrics") &&
root["GetFileMetrics"].HasMember(GETFILE1_UUID) &&
root["GetFileMetrics"].HasMember(GETFILE2_UUID);
if (getfile_metrics_verified) {
+ state_ = TestState::DESCRIBE_SPECIFIC_SYSTEM_METRIC;
+ }
+ }
+
+ void verifySpecificSystemMetrics(const rapidjson::Document& root) {
+ auto getfile_metrics_verified =
+ !root.HasMember("metrics") &&
+ root.HasMember("QueueMetrics");
+ if (getfile_metrics_verified) {
state_ = TestState::DESCRIBE_ALL_METRICS;
}
}
@@ -136,7 +154,7 @@
}
}
- TestState state_ = TestState::DESCRIBE_SPECIFIC_METRIC;
+ TestState state_ = TestState::DESCRIBE_SPECIFIC_PROCESSOR_METRIC;
std::atomic_bool& metrics_found_;
};
diff --git a/extensions/http-curl/tests/C2MetricsTest.cpp b/extensions/http-curl/tests/C2MetricsTest.cpp
index 16dd9fc..2d717e2 100644
--- a/extensions/http-curl/tests/C2MetricsTest.cpp
+++ b/extensions/http-curl/tests/C2MetricsTest.cpp
@@ -207,7 +207,7 @@
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.name", "LoadMetrics");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.classes", "QueueMetrics,RepositoryMetrics");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name", "ProcessorMetrics");
- harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes", "GetTCPMetrics");
+ harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes", "processorMetrics/GetTCP.*");
harness.setKeyDir(args.key_dir);
auto replacement_path = args.test_file;
minifi::utils::StringUtils::replaceAll(replacement_path, "TestC2Metrics", "TestC2MetricsUpdate");
diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
index 8ad250f..b301256 100644
--- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h
+++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
@@ -39,22 +39,23 @@
public:
ResponseNodeLoader(std::shared_ptr<Configure> configuration, std::shared_ptr<core::Repository> provenance_repo,
std::shared_ptr<core::Repository> flow_file_repo, core::FlowConfiguration* flow_configuration);
- std::vector<std::shared_ptr<ResponseNode>> loadResponseNodes(const std::string& clazz, core::ProcessGroup* root);
- std::vector<std::shared_ptr<ResponseNode>> getComponentMetricsNodes(const std::string& metrics_class) const;
+ void initializeComponentMetrics(core::ProcessGroup* root);
void setControllerServiceProvider(core::controller::ControllerServiceProvider* controller);
void setStateMonitor(state::StateMonitor* update_sink);
- void initializeComponentMetrics(core::ProcessGroup* root);
+ std::vector<std::shared_ptr<ResponseNode>> loadResponseNodes(const std::string& clazz, core::ProcessGroup* root) const;
private:
+ std::vector<std::shared_ptr<ResponseNode>> getComponentMetricsNodes(const std::string& metrics_class) const;
std::vector<std::shared_ptr<ResponseNode>> getResponseNodes(const std::string& clazz) const;
- void initializeRepositoryMetrics(const std::shared_ptr<ResponseNode>& response_node);
+ void initializeRepositoryMetrics(const std::shared_ptr<ResponseNode>& response_node) const;
static void initializeQueueMetrics(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root);
- void initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& response_node);
- void initializeAgentMonitor(const std::shared_ptr<ResponseNode>& response_node);
- void initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node);
- void initializeAgentStatus(const std::shared_ptr<ResponseNode>& response_node);
- void initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node);
- void initializeFlowMonitor(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root);
+ void initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& response_node) const;
+ void initializeAgentMonitor(const std::shared_ptr<ResponseNode>& response_node) const;
+ void initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node) const;
+ void initializeAgentStatus(const std::shared_ptr<ResponseNode>& response_node) const;
+ void initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node) const;
+ void initializeFlowMonitor(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root) const;
+ std::vector<std::shared_ptr<ResponseNode>> getMatchingComponentMetricsNodes(const std::string& regex_str) const;
mutable std::mutex component_metrics_mutex_;
std::unordered_map<std::string, std::vector<std::shared_ptr<ResponseNode>>> component_metrics_;
diff --git a/libminifi/src/c2/C2Client.cpp b/libminifi/src/c2/C2Client.cpp
index 39ec513..26b6c68 100644
--- a/libminifi/src/c2/C2Client.cpp
+++ b/libminifi/src/c2/C2Client.cpp
@@ -210,7 +210,7 @@
};
if (!metrics_class.empty()) {
- auto metrics_nodes = response_node_loader_.getComponentMetricsNodes(metrics_class);
+ auto metrics_nodes = response_node_loader_.loadResponseNodes(metrics_class, root_.get());
if (!metrics_nodes.empty()) {
return createReportedNode(metrics_nodes);
}
diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
index 149fe14..df2f872 100644
--- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
+++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
@@ -27,6 +27,8 @@
#include "core/state/nodes/ConfigurationChecksums.h"
#include "c2/C2Agent.h"
#include "utils/gsl.h"
+#include "utils/RegexUtils.h"
+#include "utils/StringUtils.h"
namespace org::apache::nifi::minifi::state::response {
@@ -75,7 +77,7 @@
return {response_node};
}
-void ResponseNodeLoader::initializeRepositoryMetrics(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeRepositoryMetrics(const std::shared_ptr<ResponseNode>& response_node) const {
auto repository_metrics = dynamic_cast<RepositoryMetrics*>(response_node.get());
if (repository_metrics != nullptr) {
repository_metrics->addRepository(provenance_repo_);
@@ -98,14 +100,14 @@
}
}
-void ResponseNodeLoader::initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& response_node) const {
auto identifier = dynamic_cast<state::response::AgentIdentifier*>(response_node.get());
if (identifier != nullptr) {
identifier->setAgentIdentificationProvider(configuration_);
}
}
-void ResponseNodeLoader::initializeAgentMonitor(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeAgentMonitor(const std::shared_ptr<ResponseNode>& response_node) const {
auto monitor = dynamic_cast<state::response::AgentMonitor*>(response_node.get());
if (monitor != nullptr) {
monitor->addRepository(provenance_repo_);
@@ -114,7 +116,7 @@
}
}
-void ResponseNodeLoader::initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node) const {
auto agent_node = dynamic_cast<state::response::AgentNode*>(response_node.get());
if (agent_node != nullptr && controller_ != nullptr) {
agent_node->setUpdatePolicyController(std::static_pointer_cast<controllers::UpdatePolicyControllerService>(controller_->getControllerService(c2::C2Agent::UPDATE_NAME)).get());
@@ -126,7 +128,7 @@
}
}
-void ResponseNodeLoader::initializeAgentStatus(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeAgentStatus(const std::shared_ptr<ResponseNode>& response_node) const {
auto agent_status = dynamic_cast<state::response::AgentStatus*>(response_node.get());
if (agent_status != nullptr) {
agent_status->addRepository(provenance_repo_);
@@ -135,7 +137,7 @@
}
}
-void ResponseNodeLoader::initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node) const {
auto configuration_checksums = dynamic_cast<state::response::ConfigurationChecksums*>(response_node.get());
if (configuration_checksums) {
configuration_checksums->addChecksumCalculator(configuration_->getChecksumCalculator());
@@ -145,7 +147,7 @@
}
}
-void ResponseNodeLoader::initializeFlowMonitor(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root) {
+void ResponseNodeLoader::initializeFlowMonitor(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root) const {
auto flowMonitor = dynamic_cast<state::response::FlowMonitor*>(response_node.get());
if (flowMonitor == nullptr) {
return;
@@ -165,7 +167,7 @@
}
}
-std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::loadResponseNodes(const std::string& clazz, core::ProcessGroup* root) {
+std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::loadResponseNodes(const std::string& clazz, core::ProcessGroup* root) const {
auto response_nodes = getResponseNodes(clazz);
if (response_nodes.empty()) {
logger_->log_error("No metric defined for %s", clazz);
@@ -185,9 +187,27 @@
return response_nodes;
}
+std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::getMatchingComponentMetricsNodes(const std::string& regex_str) const {
+ std::vector<std::shared_ptr<ResponseNode>> result;
+ for (const auto& [metric_name, metrics] : component_metrics_) {
+ utils::Regex regex(regex_str);
+ if (utils::regexMatch(metric_name, regex)) {
+ result.insert(result.end(), metrics.begin(), metrics.end());
+ }
+ }
+ return result;
+}
+
std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::getComponentMetricsNodes(const std::string& metrics_class) const {
- if (!metrics_class.empty()) {
- std::lock_guard<std::mutex> lock(component_metrics_mutex_);
+ if (metrics_class.empty()) {
+ return {};
+ }
+ std::lock_guard<std::mutex> lock(component_metrics_mutex_);
+ static const std::string PROCESSOR_FILTER_PREFIX = "processorMetrics/";
+ if (utils::StringUtils::startsWith(metrics_class, PROCESSOR_FILTER_PREFIX)) {
+ auto regex_str = metrics_class.substr(PROCESSOR_FILTER_PREFIX.size());
+ return getMatchingComponentMetricsNodes(regex_str);
+ } else {
const auto citer = component_metrics_.find(metrics_class);
if (citer != component_metrics_.end()) {
return citer->second;
diff --git a/libminifi/test/unit/ResponseNodeLoaderTests.cpp b/libminifi/test/unit/ResponseNodeLoaderTests.cpp
new file mode 100644
index 0000000..2223467
--- /dev/null
+++ b/libminifi/test/unit/ResponseNodeLoaderTests.cpp
@@ -0,0 +1,134 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include "../Catch.h"
+#include "core/state/nodes/ResponseNodeLoader.h"
+#include "../ReadFromFlowFileTestProcessor.h"
+#include "../WriteToFlowFileTestProcessor.h"
+#include "core/repository/VolatileContentRepository.h"
+#include "utils/Id.h"
+#include "ProvenanceTestHelper.h"
+
+namespace org::apache::nifi::minifi::test {
+
+class ResponseNodeLoaderTestFixture {
+ public:
+ ResponseNodeLoaderTestFixture()
+ : root_(std::make_unique<minifi::core::ProcessGroup>(minifi::core::ProcessGroupType::ROOT_PROCESS_GROUP, "root")),
+ configuration_(std::make_shared<minifi::Configure>()),
+ prov_repo_(std::make_shared<TestRepository>()),
+ ff_repository_(std::make_shared<TestRepository>()),
+ content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()),
+ response_node_loader_(configuration_, prov_repo_, ff_repository_, nullptr) {
+ ff_repository_->initialize(configuration_);
+ content_repo_->initialize(configuration_);
+ auto uuid1 = addProcessor<minifi::processors::WriteToFlowFileTestProcessor>("WriteToFlowFileTestProcessor1");
+ auto uuid2 = addProcessor<minifi::processors::WriteToFlowFileTestProcessor>("WriteToFlowFileTestProcessor1");
+ addConnection("Connection", "success", uuid1, uuid2);
+ auto uuid3 = addProcessor<minifi::processors::ReadFromFlowFileTestProcessor>("ReadFromFlowFileTestProcessor");
+ addConnection("Connection", "success", uuid2, uuid3);
+ response_node_loader_.initializeComponentMetrics(root_.get());
+ }
+
+ protected:
+ template<typename T, typename = typename std::enable_if_t<std::is_base_of_v<minifi::core::Processor, T>>>
+ minifi::utils::Identifier addProcessor(const std::string& name) {
+ auto processor = std::make_unique<T>(name);
+ auto uuid = processor->getUUID();
+ root_->addProcessor(std::move(processor));
+ return uuid;
+ }
+
+ void addConnection(const std::string& connection_name, const std::string& relationship_name, const minifi::utils::Identifier& src_uuid, const minifi::utils::Identifier& dst_uuid) {
+ auto connection = std::make_unique<minifi::Connection>(ff_repository_, content_repo_, connection_name);
+ connection->setRelationship({relationship_name, "d"});
+ connection->setDestinationUUID(src_uuid);
+ connection->setSourceUUID(dst_uuid);
+ root_->addConnection(std::move(connection));
+ }
+
+ std::unique_ptr<minifi::core::ProcessGroup> root_;
+ std::shared_ptr<minifi::Configure> configuration_;
+ std::shared_ptr<TestRepository> prov_repo_;
+ std::shared_ptr<TestRepository> ff_repository_;
+ std::shared_ptr<minifi::core::ContentRepository> content_repo_;
+ minifi::state::response::ResponseNodeLoader response_node_loader_;
+};
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load non-existent response node", "[responseNodeLoaderTest]") {
+ auto nodes = response_node_loader_.loadResponseNodes("NonExistentNode", root_.get());
+ REQUIRE(nodes.empty());
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load processor metrics node not part of the flow config", "[responseNodeLoaderTest]") {
+ auto nodes = response_node_loader_.loadResponseNodes("TailFileMetrics", root_.get());
+ REQUIRE(nodes.empty());
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load system metrics node", "[responseNodeLoaderTest]") {
+ auto nodes = response_node_loader_.loadResponseNodes("QueueMetrics", root_.get());
+ REQUIRE(nodes.size() == 1);
+ REQUIRE(nodes[0]->getName() == "QueueMetrics");
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load processor metrics node part of the flow config", "[responseNodeLoaderTest]") {
+ auto nodes = response_node_loader_.loadResponseNodes("ReadFromFlowFileTestProcessorMetrics", root_.get());
+ REQUIRE(nodes.size() == 1);
+ REQUIRE(nodes[0]->getName() == "ReadFromFlowFileTestProcessorMetrics");
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load multiple processor metrics nodes of the same type in a single flow", "[responseNodeLoaderTest]") {
+ auto nodes = response_node_loader_.loadResponseNodes("WriteToFlowFileTestProcessorMetrics", root_.get());
+ REQUIRE(nodes.size() == 2);
+ REQUIRE(nodes[0]->getName() == "WriteToFlowFileTestProcessorMetrics");
+ REQUIRE(nodes[1]->getName() == "WriteToFlowFileTestProcessorMetrics");
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Use regex to filter processor metrics", "[responseNodeLoaderTest]") {
+ SECTION("Load all processor metrics with regex") {
+ auto nodes = response_node_loader_.loadResponseNodes("processorMetrics/.*", root_.get());
+ std::unordered_map<std::string, uint32_t> metric_counts;
+ REQUIRE(nodes.size() == 3);
+ for (const auto& node : nodes) {
+ metric_counts[node->getName()]++;
+ }
+ REQUIRE(metric_counts["WriteToFlowFileTestProcessorMetrics"] == 2);
+ REQUIRE(metric_counts["ReadFromFlowFileTestProcessorMetrics"] == 1);
+ }
+
+ SECTION("Filter for a single processor") {
+ auto nodes = response_node_loader_.loadResponseNodes("processorMetrics/Read.*", root_.get());
+ REQUIRE(nodes.size() == 1);
+ REQUIRE(nodes[0]->getName() == "ReadFromFlowFileTestProcessorMetrics");
+ }
+
+ SECTION("Full match") {
+ auto nodes = response_node_loader_.loadResponseNodes("processorMetrics/ReadFromFlowFileTestProcessorMetrics", root_.get());
+ REQUIRE(nodes.size() == 1);
+ REQUIRE(nodes[0]->getName() == "ReadFromFlowFileTestProcessorMetrics");
+ }
+
+ SECTION("No partial match is allowed") {
+ auto nodes = response_node_loader_.loadResponseNodes("processorMetrics/Read", root_.get());
+ REQUIRE(nodes.empty());
+ }
+}
+
+} // namespace org::apache::nifi::minifi::test