MINIFICPP-1987 Configuring processor metrics with regular expressions

Signed-off-by: Ferenc Gerlits <fgerlits@gmail.com>
This closes #1459
diff --git a/C2.md b/C2.md
index 14c1709..71c40f1 100644
--- a/C2.md
+++ b/C2.md
@@ -117,6 +117,10 @@
     nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name=ProcessorMetric
     nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes=GetFileMetrics
 
+Processor metrics can also be configured using regular expressions with the `processorMetrics/` prefix, so the following definition is also valid:
+
+    nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes=processorMetrics/Get.*Metrics
+
 This example shows a metrics sub tree defined by the option 'nifi.c2.root.class.definitions'.
 
 This is a comma separated list of all sub trees. In the example, above, only one sub tree exists: metrics.
diff --git a/METRICS.md b/METRICS.md
index fa0732c..bf6ed91 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -59,13 +59,13 @@
 
     # in minifi.properties
 
-    nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation
+    nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation,processorMetrics/Tail.*
 
 An agent identifier should also be defined to identify which agent the metric is exposed from. If not set, the hostname is used as the identifier.
 
-	# in minifi.properties
+    # in minifi.properties
 
-	nifi.metrics.publisher.agent.identifier=Agent1
+    nifi.metrics.publisher.agent.identifier=Agent1
 
 ## System Metrics
 
@@ -168,6 +168,16 @@
 
 Processor level metrics can be accessed for any processor provided by MiNiFi. These metrics correspond to the name of the processor appended by the "Metrics" suffix (e.g. GetFileMetrics, TailFileMetrics, etc.).
 
+Besides configuring processor metrics directly, they can also be configured using regular expressions with the `processorMetrics/` prefix.
+
+All available processor metrics can be requested in the `minifi.properties` by using the following configuration:
+
+    nifi.metrics.publisher.metrics=processorMetrics/.*
+
+Regular expressions can also be used for requesting multiple processor metrics at once, like GetFileMetrics and GetTCPMetrics with the following configuration:
+
+    nifi.metrics.publisher.metrics=processorMetrics/Get.*Metrics
+
 ### General Metrics
 
 There are general metrics that are available for all processors. Besides these metrics processors can implement additional metrics that are speicific to that processor.
diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/minifi/core/ImageStore.py
index c0c1588..20d212c 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -111,7 +111,7 @@
                 RUN echo nifi.metrics.publisher.agent.identifier=Agent1 >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.metrics.publisher.class=PrometheusMetricsPublisher >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936 >> {minifi_root}/conf/minifi.properties
-                RUN echo nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,GetFileMetrics,GetTCPMetrics,PutFileMetrics,FlowInformation,DeviceInfoNode,AgentStatus >> {minifi_root}/conf/minifi.properties
+                RUN echo nifi.metrics.publisher.metrics=RepositoryMetrics,QueueMetrics,PutFileMetrics,processorMetrics/Get.*,FlowInformation,DeviceInfoNode,AgentStatus >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.c2.enable=true  >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.c2.rest.url=http://minifi-c2-server:10090/c2/config/heartbeat  >> {minifi_root}/conf/minifi.properties
                 RUN echo nifi.c2.rest.url.ack=http://minifi-c2-server:10090/c2/config/acknowledge  >> {minifi_root}/conf/minifi.properties
diff --git a/extensions/http-curl/tests/C2DescribeMetricsTest.cpp b/extensions/http-curl/tests/C2DescribeMetricsTest.cpp
index acf5d80..0e177d5 100644
--- a/extensions/http-curl/tests/C2DescribeMetricsTest.cpp
+++ b/extensions/http-curl/tests/C2DescribeMetricsTest.cpp
@@ -66,10 +66,14 @@
 
   void handleHeartbeat(const rapidjson::Document&, struct mg_connection* conn) override {
     switch (state_) {
-      case TestState::DESCRIBE_SPECIFIC_METRIC: {
+      case TestState::DESCRIBE_SPECIFIC_PROCESSOR_METRIC: {
         sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn, {{"metricsClass", "GetFileMetrics"}});
         break;
       }
+      case TestState::DESCRIBE_SPECIFIC_SYSTEM_METRIC: {
+        sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn, {{"metricsClass", "QueueMetrics"}});
+        break;
+      }
       case TestState::DESCRIBE_ALL_METRICS: {
         sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn);
         break;
@@ -81,8 +85,12 @@
 
   void handleAcknowledge(const rapidjson::Document& root) override {
     switch (state_) {
-      case TestState::DESCRIBE_SPECIFIC_METRIC: {
-        verifySpecificMetrics(root);
+      case TestState::DESCRIBE_SPECIFIC_PROCESSOR_METRIC: {
+        verifySpecificProcessorMetrics(root);
+        break;
+      }
+      case TestState::DESCRIBE_SPECIFIC_SYSTEM_METRIC: {
+        verifySpecificSystemMetrics(root);
         break;
       }
       case TestState::DESCRIBE_ALL_METRICS: {
@@ -96,7 +104,8 @@
 
  private:
   enum class TestState {
-    DESCRIBE_SPECIFIC_METRIC,
+    DESCRIBE_SPECIFIC_PROCESSOR_METRIC,
+    DESCRIBE_SPECIFIC_SYSTEM_METRIC,
     DESCRIBE_ALL_METRICS
   };
 
@@ -109,13 +118,22 @@
     mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
   }
 
-  void verifySpecificMetrics(const rapidjson::Document& root) {
+  void verifySpecificProcessorMetrics(const rapidjson::Document& root) {
     auto getfile_metrics_verified =
       !root.HasMember("metrics") &&
       root.HasMember("GetFileMetrics") &&
       root["GetFileMetrics"].HasMember(GETFILE1_UUID) &&
       root["GetFileMetrics"].HasMember(GETFILE2_UUID);
     if (getfile_metrics_verified) {
+      state_ = TestState::DESCRIBE_SPECIFIC_SYSTEM_METRIC;
+    }
+  }
+
+  void verifySpecificSystemMetrics(const rapidjson::Document& root) {
+    auto getfile_metrics_verified =
+      !root.HasMember("metrics") &&
+      root.HasMember("QueueMetrics");
+    if (getfile_metrics_verified) {
       state_ = TestState::DESCRIBE_ALL_METRICS;
     }
   }
@@ -136,7 +154,7 @@
     }
   }
 
-  TestState state_ = TestState::DESCRIBE_SPECIFIC_METRIC;
+  TestState state_ = TestState::DESCRIBE_SPECIFIC_PROCESSOR_METRIC;
   std::atomic_bool& metrics_found_;
 };
 
diff --git a/extensions/http-curl/tests/C2MetricsTest.cpp b/extensions/http-curl/tests/C2MetricsTest.cpp
index 16dd9fc..2d717e2 100644
--- a/extensions/http-curl/tests/C2MetricsTest.cpp
+++ b/extensions/http-curl/tests/C2MetricsTest.cpp
@@ -207,7 +207,7 @@
   harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.name", "LoadMetrics");
   harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.classes", "QueueMetrics,RepositoryMetrics");
   harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name", "ProcessorMetrics");
-  harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes", "GetTCPMetrics");
+  harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.classes", "processorMetrics/GetTCP.*");
   harness.setKeyDir(args.key_dir);
   auto replacement_path = args.test_file;
   minifi::utils::StringUtils::replaceAll(replacement_path, "TestC2Metrics", "TestC2MetricsUpdate");
diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
index 8ad250f..b301256 100644
--- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h
+++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h
@@ -39,22 +39,23 @@
  public:
   ResponseNodeLoader(std::shared_ptr<Configure> configuration, std::shared_ptr<core::Repository> provenance_repo,
     std::shared_ptr<core::Repository> flow_file_repo, core::FlowConfiguration* flow_configuration);
-  std::vector<std::shared_ptr<ResponseNode>> loadResponseNodes(const std::string& clazz, core::ProcessGroup* root);
-  std::vector<std::shared_ptr<ResponseNode>> getComponentMetricsNodes(const std::string& metrics_class) const;
+  void initializeComponentMetrics(core::ProcessGroup* root);
   void setControllerServiceProvider(core::controller::ControllerServiceProvider* controller);
   void setStateMonitor(state::StateMonitor* update_sink);
-  void initializeComponentMetrics(core::ProcessGroup* root);
+  std::vector<std::shared_ptr<ResponseNode>> loadResponseNodes(const std::string& clazz, core::ProcessGroup* root) const;
 
  private:
+  std::vector<std::shared_ptr<ResponseNode>> getComponentMetricsNodes(const std::string& metrics_class) const;
   std::vector<std::shared_ptr<ResponseNode>> getResponseNodes(const std::string& clazz) const;
-  void initializeRepositoryMetrics(const std::shared_ptr<ResponseNode>& response_node);
+  void initializeRepositoryMetrics(const std::shared_ptr<ResponseNode>& response_node) const;
   static void initializeQueueMetrics(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root);
-  void initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& response_node);
-  void initializeAgentMonitor(const std::shared_ptr<ResponseNode>& response_node);
-  void initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node);
-  void initializeAgentStatus(const std::shared_ptr<ResponseNode>& response_node);
-  void initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node);
-  void initializeFlowMonitor(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root);
+  void initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& response_node) const;
+  void initializeAgentMonitor(const std::shared_ptr<ResponseNode>& response_node) const;
+  void initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node) const;
+  void initializeAgentStatus(const std::shared_ptr<ResponseNode>& response_node) const;
+  void initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node) const;
+  void initializeFlowMonitor(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root) const;
+  std::vector<std::shared_ptr<ResponseNode>> getMatchingComponentMetricsNodes(const std::string& regex_str) const;
 
   mutable std::mutex component_metrics_mutex_;
   std::unordered_map<std::string, std::vector<std::shared_ptr<ResponseNode>>> component_metrics_;
diff --git a/libminifi/src/c2/C2Client.cpp b/libminifi/src/c2/C2Client.cpp
index 39ec513..26b6c68 100644
--- a/libminifi/src/c2/C2Client.cpp
+++ b/libminifi/src/c2/C2Client.cpp
@@ -210,7 +210,7 @@
   };
 
   if (!metrics_class.empty()) {
-    auto metrics_nodes = response_node_loader_.getComponentMetricsNodes(metrics_class);
+    auto metrics_nodes = response_node_loader_.loadResponseNodes(metrics_class, root_.get());
     if (!metrics_nodes.empty()) {
       return createReportedNode(metrics_nodes);
     }
diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
index 149fe14..df2f872 100644
--- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
+++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
@@ -27,6 +27,8 @@
 #include "core/state/nodes/ConfigurationChecksums.h"
 #include "c2/C2Agent.h"
 #include "utils/gsl.h"
+#include "utils/RegexUtils.h"
+#include "utils/StringUtils.h"
 
 namespace org::apache::nifi::minifi::state::response {
 
@@ -75,7 +77,7 @@
   return {response_node};
 }
 
-void ResponseNodeLoader::initializeRepositoryMetrics(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeRepositoryMetrics(const std::shared_ptr<ResponseNode>& response_node) const {
   auto repository_metrics = dynamic_cast<RepositoryMetrics*>(response_node.get());
   if (repository_metrics != nullptr) {
     repository_metrics->addRepository(provenance_repo_);
@@ -98,14 +100,14 @@
   }
 }
 
-void ResponseNodeLoader::initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeAgentIdentifier(const std::shared_ptr<ResponseNode>& response_node) const {
   auto identifier = dynamic_cast<state::response::AgentIdentifier*>(response_node.get());
   if (identifier != nullptr) {
     identifier->setAgentIdentificationProvider(configuration_);
   }
 }
 
-void ResponseNodeLoader::initializeAgentMonitor(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeAgentMonitor(const std::shared_ptr<ResponseNode>& response_node) const {
   auto monitor = dynamic_cast<state::response::AgentMonitor*>(response_node.get());
   if (monitor != nullptr) {
     monitor->addRepository(provenance_repo_);
@@ -114,7 +116,7 @@
   }
 }
 
-void ResponseNodeLoader::initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeAgentNode(const std::shared_ptr<ResponseNode>& response_node) const {
   auto agent_node = dynamic_cast<state::response::AgentNode*>(response_node.get());
   if (agent_node != nullptr && controller_ != nullptr) {
     agent_node->setUpdatePolicyController(std::static_pointer_cast<controllers::UpdatePolicyControllerService>(controller_->getControllerService(c2::C2Agent::UPDATE_NAME)).get());
@@ -126,7 +128,7 @@
   }
 }
 
-void ResponseNodeLoader::initializeAgentStatus(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeAgentStatus(const std::shared_ptr<ResponseNode>& response_node) const {
   auto agent_status = dynamic_cast<state::response::AgentStatus*>(response_node.get());
   if (agent_status != nullptr) {
     agent_status->addRepository(provenance_repo_);
@@ -135,7 +137,7 @@
   }
 }
 
-void ResponseNodeLoader::initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node) {
+void ResponseNodeLoader::initializeConfigurationChecksums(const std::shared_ptr<ResponseNode>& response_node) const {
   auto configuration_checksums = dynamic_cast<state::response::ConfigurationChecksums*>(response_node.get());
   if (configuration_checksums) {
     configuration_checksums->addChecksumCalculator(configuration_->getChecksumCalculator());
@@ -145,7 +147,7 @@
   }
 }
 
-void ResponseNodeLoader::initializeFlowMonitor(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root) {
+void ResponseNodeLoader::initializeFlowMonitor(const std::shared_ptr<ResponseNode>& response_node, core::ProcessGroup* root) const {
   auto flowMonitor = dynamic_cast<state::response::FlowMonitor*>(response_node.get());
   if (flowMonitor == nullptr) {
     return;
@@ -165,7 +167,7 @@
   }
 }
 
-std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::loadResponseNodes(const std::string& clazz, core::ProcessGroup* root) {
+std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::loadResponseNodes(const std::string& clazz, core::ProcessGroup* root) const {
   auto response_nodes = getResponseNodes(clazz);
   if (response_nodes.empty()) {
     logger_->log_error("No metric defined for %s", clazz);
@@ -185,9 +187,27 @@
   return response_nodes;
 }
 
+std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::getMatchingComponentMetricsNodes(const std::string& regex_str) const {
+  std::vector<std::shared_ptr<ResponseNode>> result;
+  for (const auto& [metric_name, metrics] : component_metrics_) {
+    utils::Regex regex(regex_str);
+    if (utils::regexMatch(metric_name, regex)) {
+      result.insert(result.end(), metrics.begin(), metrics.end());
+    }
+  }
+  return result;
+}
+
 std::vector<std::shared_ptr<ResponseNode>> ResponseNodeLoader::getComponentMetricsNodes(const std::string& metrics_class) const {
-  if (!metrics_class.empty()) {
-    std::lock_guard<std::mutex> lock(component_metrics_mutex_);
+  if (metrics_class.empty()) {
+    return {};
+  }
+  std::lock_guard<std::mutex> lock(component_metrics_mutex_);
+  static const std::string PROCESSOR_FILTER_PREFIX = "processorMetrics/";
+  if (utils::StringUtils::startsWith(metrics_class, PROCESSOR_FILTER_PREFIX)) {
+    auto regex_str = metrics_class.substr(PROCESSOR_FILTER_PREFIX.size());
+    return getMatchingComponentMetricsNodes(regex_str);
+  } else {
     const auto citer = component_metrics_.find(metrics_class);
     if (citer != component_metrics_.end()) {
       return citer->second;
diff --git a/libminifi/test/unit/ResponseNodeLoaderTests.cpp b/libminifi/test/unit/ResponseNodeLoaderTests.cpp
new file mode 100644
index 0000000..2223467
--- /dev/null
+++ b/libminifi/test/unit/ResponseNodeLoaderTests.cpp
@@ -0,0 +1,134 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include "../Catch.h"
+#include "core/state/nodes/ResponseNodeLoader.h"
+#include "../ReadFromFlowFileTestProcessor.h"
+#include "../WriteToFlowFileTestProcessor.h"
+#include "core/repository/VolatileContentRepository.h"
+#include "utils/Id.h"
+#include "ProvenanceTestHelper.h"
+
+namespace org::apache::nifi::minifi::test {
+
+class ResponseNodeLoaderTestFixture {
+ public:
+  ResponseNodeLoaderTestFixture()
+    : root_(std::make_unique<minifi::core::ProcessGroup>(minifi::core::ProcessGroupType::ROOT_PROCESS_GROUP, "root")),
+      configuration_(std::make_shared<minifi::Configure>()),
+      prov_repo_(std::make_shared<TestRepository>()),
+      ff_repository_(std::make_shared<TestRepository>()),
+      content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()),
+      response_node_loader_(configuration_, prov_repo_, ff_repository_, nullptr) {
+    ff_repository_->initialize(configuration_);
+    content_repo_->initialize(configuration_);
+    auto uuid1 = addProcessor<minifi::processors::WriteToFlowFileTestProcessor>("WriteToFlowFileTestProcessor1");
+    auto uuid2 = addProcessor<minifi::processors::WriteToFlowFileTestProcessor>("WriteToFlowFileTestProcessor1");
+    addConnection("Connection", "success", uuid1, uuid2);
+    auto uuid3 = addProcessor<minifi::processors::ReadFromFlowFileTestProcessor>("ReadFromFlowFileTestProcessor");
+    addConnection("Connection", "success", uuid2, uuid3);
+    response_node_loader_.initializeComponentMetrics(root_.get());
+  }
+
+ protected:
+  template<typename T, typename = typename std::enable_if_t<std::is_base_of_v<minifi::core::Processor, T>>>
+  minifi::utils::Identifier addProcessor(const std::string& name) {
+    auto processor = std::make_unique<T>(name);
+    auto uuid = processor->getUUID();
+    root_->addProcessor(std::move(processor));
+    return uuid;
+  }
+
+  void addConnection(const std::string& connection_name, const std::string& relationship_name, const minifi::utils::Identifier& src_uuid, const minifi::utils::Identifier& dst_uuid) {
+    auto connection = std::make_unique<minifi::Connection>(ff_repository_, content_repo_, connection_name);
+    connection->setRelationship({relationship_name, "d"});
+    connection->setDestinationUUID(src_uuid);
+    connection->setSourceUUID(dst_uuid);
+    root_->addConnection(std::move(connection));
+  }
+
+  std::unique_ptr<minifi::core::ProcessGroup> root_;
+  std::shared_ptr<minifi::Configure> configuration_;
+  std::shared_ptr<TestRepository> prov_repo_;
+  std::shared_ptr<TestRepository> ff_repository_;
+  std::shared_ptr<minifi::core::ContentRepository> content_repo_;
+  minifi::state::response::ResponseNodeLoader response_node_loader_;
+};
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load non-existent response node", "[responseNodeLoaderTest]") {
+  auto nodes = response_node_loader_.loadResponseNodes("NonExistentNode", root_.get());
+  REQUIRE(nodes.empty());
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load processor metrics node not part of the flow config", "[responseNodeLoaderTest]") {
+  auto nodes = response_node_loader_.loadResponseNodes("TailFileMetrics", root_.get());
+  REQUIRE(nodes.empty());
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load system metrics node", "[responseNodeLoaderTest]") {
+  auto nodes = response_node_loader_.loadResponseNodes("QueueMetrics", root_.get());
+  REQUIRE(nodes.size() == 1);
+  REQUIRE(nodes[0]->getName() == "QueueMetrics");
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load processor metrics node part of the flow config", "[responseNodeLoaderTest]") {
+  auto nodes = response_node_loader_.loadResponseNodes("ReadFromFlowFileTestProcessorMetrics", root_.get());
+  REQUIRE(nodes.size() == 1);
+  REQUIRE(nodes[0]->getName() == "ReadFromFlowFileTestProcessorMetrics");
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Load multiple processor metrics nodes of the same type in a single flow", "[responseNodeLoaderTest]") {
+  auto nodes = response_node_loader_.loadResponseNodes("WriteToFlowFileTestProcessorMetrics", root_.get());
+  REQUIRE(nodes.size() == 2);
+  REQUIRE(nodes[0]->getName() == "WriteToFlowFileTestProcessorMetrics");
+  REQUIRE(nodes[1]->getName() == "WriteToFlowFileTestProcessorMetrics");
+}
+
+TEST_CASE_METHOD(ResponseNodeLoaderTestFixture, "Use regex to filter processor metrics", "[responseNodeLoaderTest]") {
+  SECTION("Load all processor metrics with regex") {
+    auto nodes = response_node_loader_.loadResponseNodes("processorMetrics/.*", root_.get());
+    std::unordered_map<std::string, uint32_t> metric_counts;
+    REQUIRE(nodes.size() == 3);
+    for (const auto& node : nodes) {
+      metric_counts[node->getName()]++;
+    }
+    REQUIRE(metric_counts["WriteToFlowFileTestProcessorMetrics"] == 2);
+    REQUIRE(metric_counts["ReadFromFlowFileTestProcessorMetrics"] == 1);
+  }
+
+  SECTION("Filter for a single processor") {
+    auto nodes = response_node_loader_.loadResponseNodes("processorMetrics/Read.*", root_.get());
+    REQUIRE(nodes.size() == 1);
+    REQUIRE(nodes[0]->getName() == "ReadFromFlowFileTestProcessorMetrics");
+  }
+
+  SECTION("Full match") {
+    auto nodes = response_node_loader_.loadResponseNodes("processorMetrics/ReadFromFlowFileTestProcessorMetrics", root_.get());
+    REQUIRE(nodes.size() == 1);
+    REQUIRE(nodes[0]->getName() == "ReadFromFlowFileTestProcessorMetrics");
+  }
+
+  SECTION("No partial match is allowed") {
+    auto nodes = response_node_loader_.loadResponseNodes("processorMetrics/Read", root_.get());
+    REQUIRE(nodes.empty());
+  }
+}
+
+}  // namespace org::apache::nifi::minifi::test