MINIFICPP-1962 Implement communication between process group through ports

Closes #1451
Signed-off-by: Marton Szasz <szaszm@apache.org>
diff --git a/extensions/standard-processors/tests/CMakeLists.txt b/extensions/standard-processors/tests/CMakeLists.txt
index 01e162d..784de18 100644
--- a/extensions/standard-processors/tests/CMakeLists.txt
+++ b/extensions/standard-processors/tests/CMakeLists.txt
@@ -97,8 +97,8 @@
 endif()
 
 add_test(NAME TailFileTest COMMAND TailFileTest "${TEST_RESOURCES}/TestTailFile.yml"  "${TEST_RESOURCES}/")
-
 add_test(NAME TailFileCronTest COMMAND TailFileTest "${TEST_RESOURCES}/TestTailFileCron.yml"  "${TEST_RESOURCES}/")
+add_test(NAME ProcessGroupTest COMMAND ProcessGroupTest "${TEST_RESOURCES}/TestProcessGroup.yml")
 
 FOREACH(resourcefile ${RESOURCE_APPS})
     get_filename_component(resourcefilename "${resourcefile}" NAME_WE)
diff --git a/extensions/standard-processors/tests/integration/ProcessGroupTest.cpp b/extensions/standard-processors/tests/integration/ProcessGroupTest.cpp
new file mode 100644
index 0000000..5e83586
--- /dev/null
+++ b/extensions/standard-processors/tests/integration/ProcessGroupTest.cpp
@@ -0,0 +1,63 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <cassert>
+#include <string>
+
+#include "core/logging/Logger.h"
+#include "FlowController.h"
+#include "TestBase.h"
+#include "Catch.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/LogAttribute.h"
+#include "processors/UpdateAttribute.h"
+#include "integration/IntegrationBase.h"
+#include "utils/IntegrationTestUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+class ProcessGroupTestHarness : public IntegrationBase {
+ public:
+  ProcessGroupTestHarness() : IntegrationBase(2s) {
+  }
+
+  void testSetup() override {
+    LogTestController::getInstance().setInfo<minifi::processors::LogAttribute>();
+    LogTestController::getInstance().setTrace<minifi::processors::GenerateFlowFile>();
+    LogTestController::getInstance().setTrace<minifi::processors::UpdateAttribute>();
+  }
+
+  void runAssertions() override {
+    using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+    assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_),
+        "key:test_attribute value:success"));
+  }
+};
+
+int main(int argc, char **argv) {
+  std::string test_file_location;
+  if (argc > 1) {
+    test_file_location = argv[1];
+  }
+
+  ProcessGroupTestHarness harness;
+  harness.run(test_file_location);
+
+  return 0;
+}
diff --git a/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h b/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h
index 58a0dac..bb9d167 100644
--- a/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h
+++ b/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h
@@ -60,9 +60,21 @@
   }
 };
 
+enum class ConnectionFailure {
+  UNRESOLVED_SOURCE,
+  UNRESOLVED_DESTINATION,
+  INPUT_CANNOT_BE_SOURCE,
+  OUTPUT_CANNOT_BE_DESTINATION,
+  INPUT_CANNOT_BE_DESTINATION,
+  OUTPUT_CANNOT_BE_SOURCE
+};
+
 struct Proc {
+  Proc(std::string id, std::string name, const std::optional<ConnectionFailure>& failure = std::nullopt)
+    : id(std::move(id)), name(std::move(name)), failure(failure) {}
   std::string id;
   std::string name;
+  std::optional<ConnectionFailure> failure;
 
   Lines serialize() const {
     return {{
@@ -73,17 +85,33 @@
   }
 };
 
-struct UnresolvedProc {
-  explicit UnresolvedProc(std::string id): id(std::move(id)) {}
+template<typename Tag>
+struct Port {
+  Port(std::string id, std::string name, const std::optional<ConnectionFailure>& failure = std::nullopt)
+    : id(std::move(id)), name(std::move(name)), failure(failure) {}
   std::string id;
+  std::string name;
+  std::optional<ConnectionFailure> failure;
+
+  Lines serialize() const {
+    return {{
+      "- id: " + id,
+      "  name: " + name
+    }};
+  }
 };
 
+using InputPort = Port<struct InputTag>;
+using OutputPort = Port<struct OutputTag>;
+
 struct MaybeProc {
-  MaybeProc(const Proc& proc): id(proc.id), name(proc.name) {}  // NOLINT
-  MaybeProc(const UnresolvedProc& proc) : id(proc.id) {}  // NOLINT
+  MaybeProc(const Proc& proc) : id(proc.id), name(proc.name), failure(proc.failure) {}  // NOLINT(runtime/explicit)
+  MaybeProc(const InputPort& port) : id(port.id), name(port.name), failure(port.failure) {}  // NOLINT(runtime/explicit)
+  MaybeProc(const OutputPort& port) : id(port.id), name(port.name), failure(port.failure) {}  // NOLINT(runtime/explicit)
 
   std::string id;
-  std::optional<std::string> name;
+  std::string name;
+  std::optional<ConnectionFailure> failure;
 };
 
 struct Conn {
@@ -139,6 +167,14 @@
     rpgs_ = std::move(rpgs);
     return *this;
   }
+  Group& With(std::vector<InputPort> input_ports) {
+    input_ports_ = std::move(input_ports);
+    return *this;
+  }
+  Group& With(std::vector<OutputPort> output_ports) {
+    output_ports_ = std::move(output_ports);
+    return *this;
+  }
   Lines serialize(bool is_root = true) const {
     Lines body;
     if (processors_.empty()) {
@@ -169,6 +205,22 @@
         body.append(subgroup.serialize(false).indentAll());
       }
     }
+    if (input_ports_.empty()) {
+      body.emplace_back("Input Ports: []");
+    } else {
+      body.emplace_back("Input Ports:");
+      for (const auto& port : input_ports_) {
+        body.append(port.serialize().indentAll());
+      }
+    }
+    if (output_ports_.empty()) {
+      body.emplace_back("Output Ports: []");
+    } else {
+      body.emplace_back("Output Ports:");
+      for (const auto& port : output_ports_) {
+        body.append(port.serialize().indentAll());
+      }
+    }
     Lines lines;
     if (is_root) {
       lines.emplace_back("Flow Controller:");
@@ -186,12 +238,15 @@
   std::vector<Proc> processors_;
   std::vector<Group> subgroups_;
   std::vector<RPG> rpgs_;
+  std::vector<InputPort> input_ports_;
+  std::vector<OutputPort> output_ports_;
 };
 
 struct ProcessGroupTestAccessor {
   FIELD_ACCESSOR(processors_)
   FIELD_ACCESSOR(connections_)
   FIELD_ACCESSOR(child_process_groups_)
+  FIELD_ACCESSOR(ports_)
 };
 
 template<typename T, typename = void>
@@ -222,6 +277,54 @@
   return nullptr;
 }
 
+void assertFailure(const Conn& expected, ConnectionFailure failure) {
+  auto assertMessage = [](const std::string& message) {
+    REQUIRE(utils::verifyLogLinePresenceInPollTime(std::chrono::seconds{1}, message));
+  };
+
+  switch (failure) {
+    case ConnectionFailure::UNRESOLVED_DESTINATION: {
+      assertMessage("Cannot find the destination processor with id '" + expected.destination.id + "' for the connection [name = '" + expected.name + "'");
+      break;
+    }
+    case ConnectionFailure::UNRESOLVED_SOURCE: {
+      assertMessage("Cannot find the source processor with id '" + expected.source.id + "' for the connection [name = '" + expected.name + "'");
+      break;
+    }
+    case ConnectionFailure::INPUT_CANNOT_BE_SOURCE: {
+      assertMessage("Input port [id = '" + expected.source.id + "'] cannot be a source outside the process group in the connection [name = '" + expected.name + "'");
+      break;
+    }
+    case ConnectionFailure::OUTPUT_CANNOT_BE_DESTINATION: {
+      assertMessage("Output port [id = '" + expected.destination.id + "'] cannot be a destination outside the process group in the connection [name = '" + expected.name + "'");
+      break;
+    }
+    case ConnectionFailure::INPUT_CANNOT_BE_DESTINATION: {
+      assertMessage("Input port [id = '" + expected.destination.id + "'] cannot be a destination inside the process group in the connection [name = '" + expected.name + "'");
+      break;
+    }
+    case ConnectionFailure::OUTPUT_CANNOT_BE_SOURCE: {
+      assertMessage("Output port [id = '" + expected.source.id + "'] cannot be a source inside the process group in the connection [name = '" + expected.name + "'");
+      break;
+    }
+  }
+}
+
+void verifyConnectionNode(minifi::Connection* conn, const Conn& expected) {
+  if (expected.source.failure) {
+    REQUIRE(conn->getSource() == nullptr);
+    assertFailure(expected, *expected.source.failure);
+  } else {
+    REQUIRE(conn->getSource()->getName() == expected.source.name);
+  }
+  if (expected.destination.failure) {
+    REQUIRE(conn->getDestination() == nullptr);
+    assertFailure(expected, *expected.destination.failure);
+  } else {
+    REQUIRE(conn->getDestination()->getName() == expected.destination.name);
+  }
+}
+
 void verifyProcessGroup(core::ProcessGroup& group, const Group& pattern) {
   // verify name
   REQUIRE(group.getName() == pattern.name_);
@@ -231,33 +334,34 @@
   for (auto& expected : pattern.connections_) {
     auto conn = findByName(connections, expected.name);
     REQUIRE(conn);
-    if (!expected.source.name) {
-      REQUIRE(conn->getSource() == nullptr);
-      REQUIRE(utils::verifyLogLinePresenceInPollTime(
-          std::chrono::seconds{1},
-          "Cannot find the source processor with id '" + expected.source.id
-          + "' for the connection [name = '" + expected.name + "'"));
-    } else {
-      REQUIRE(conn->getSource()->getName() == expected.source.name);
-    }
-    if (!expected.destination.name) {
-      REQUIRE(conn->getDestination() == nullptr);
-      REQUIRE(utils::verifyLogLinePresenceInPollTime(
-          std::chrono::seconds{1},
-          "Cannot find the destination processor with id '" + expected.destination.id
-          + "' for the connection [name = '" + expected.name + "'"));
-    } else {
-      REQUIRE(conn->getDestination()->getName() == expected.destination.name);
-    }
+    verifyConnectionNode(conn, expected);
   }
 
-  // verify processors
+  // verify processors and ports
   const auto& processors = ProcessGroupTestAccessor::get_processors_(group);
-  REQUIRE(processors.size() == pattern.processors_.size());
+  REQUIRE(processors.size() == pattern.processors_.size() + pattern.input_ports_.size() + pattern.output_ports_.size());
   for (auto& expected : pattern.processors_) {
     REQUIRE(findByName(processors, expected.name));
   }
 
+  for (auto& expected : pattern.input_ports_) {
+    REQUIRE(findByName(processors, expected.name));
+  }
+
+  for (auto& expected : pattern.output_ports_) {
+    REQUIRE(findByName(processors, expected.name));
+  }
+
+  const auto& ports = ProcessGroupTestAccessor::get_ports_(group);
+  REQUIRE(ports.size() == pattern.input_ports_.size() + pattern.output_ports_.size());
+  for (auto& expected : pattern.input_ports_) {
+    REQUIRE(findByName(ports, expected.name));
+  }
+
+  for (auto& expected : pattern.output_ports_) {
+    REQUIRE(findByName(ports, expected.name));
+  }
+
   std::set<core::ProcessGroup*> simple_subgroups;
   std::set<core::ProcessGroup*> rpg_subgroups;
   for (auto& subgroup : ProcessGroupTestAccessor::get_child_process_groups_(group)) {
diff --git a/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp b/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp
index 52e264d..0f58795 100644
--- a/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp
@@ -24,7 +24,7 @@
 
 static core::YamlConfiguration config(nullptr, nullptr, nullptr, nullptr, std::make_shared<minifi::Configure>());
 
-TEST_CASE("Root process group is correctly parsed", "[YamlProcessGroupParser1]") {
+TEST_CASE("Root process group is correctly parsed", "[YamlProcessGroupParser]") {
   auto pattern = Group("root")
     .With({
       Conn{"Conn1",
@@ -48,7 +48,7 @@
   verifyProcessGroup(*root, pattern);
 }
 
-TEST_CASE("Nested process group is correctly parsed", "[YamlProcessGroupParser2]") {
+TEST_CASE("Nested process group is correctly parsed", "[YamlProcessGroupParser]") {
   auto pattern = Group("root")
     .With({Conn{"Conn1",
                 Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
@@ -71,7 +71,7 @@
   verifyProcessGroup(*root, pattern);
 }
 
-TEST_CASE("Cannot connect processors from different groups", "[YamlProcessGroupParser3]") {
+TEST_CASE("Cannot connect processors from different groups", "[YamlProcessGroupParser]") {
   TestController controller;
   LogTestController::getInstance().setTrace<core::YamlConfiguration>();
   Proc Proc1{"00000000-0000-0000-0000-000000000001", "Proc1"};
@@ -102,18 +102,18 @@
   }
 
   SECTION("Connecting processors in their child/parent group") {
-    Conn1.source = UnresolvedProc{Child1_Proc1.id};
-    Conn1.destination = UnresolvedProc{Child1_Port1.id};
+    Conn1.source = Proc{Child1_Proc1.id, Child1_Proc1.name, ConnectionFailure::UNRESOLVED_SOURCE};
+    Conn1.destination = Proc{Child1_Port1.id, Child1_Port1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
 
-    Child1_Conn1.source = UnresolvedProc{Proc1.id};
-    Child1_Conn1.destination = UnresolvedProc{Port1.id};
+    Child1_Conn1.source = Proc{Proc1.id, Proc1.name, ConnectionFailure::UNRESOLVED_SOURCE};
+    Child1_Conn1.destination = Proc{Port1.id, Proc1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
   }
 
   SECTION("Connecting processors between their own and their child/parent group") {
     Conn1.source = Proc1;
-    Conn1.destination = UnresolvedProc{Child1_Port1.id};
+    Conn1.destination = Proc{Child1_Port1.id, Child1_Port1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
 
-    Child1_Conn1.source = UnresolvedProc{Port1.id};
+    Child1_Conn1.source = Proc{Port1.id, Port1.name, ConnectionFailure::UNRESOLVED_SOURCE};
     Child1_Conn1.destination = Child1_Proc1;
   }
 
@@ -121,11 +121,171 @@
     Conn1.source = Proc1;
     Conn1.destination = Port1;
 
-    Child1_Conn1.source = UnresolvedProc{Child2_Proc1.id};
-    Child1_Conn1.destination = UnresolvedProc{Child2_Port1.id};
+    Child1_Conn1.source = Proc{Child2_Proc1.id, Child2_Proc1.name, ConnectionFailure::UNRESOLVED_SOURCE};
+    Child1_Conn1.destination = Proc{Child2_Port1.id, Child2_Port1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
   }
 
   auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
 
   verifyProcessGroup(*root, pattern);
 }
+
+TEST_CASE("Processor can communicate with child process group's input port", "[YamlProcessGroupParser]") {
+  auto pattern = Group("root")
+    .With({Conn{"Conn1",
+                Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
+                InputPort{"00000000-0000-0000-0000-000000000002", "Port1"}}})
+    .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}})
+    .With({
+      Group("Child1")
+      .With({InputPort{"00000000-0000-0000-0000-000000000002", "Port1"}})
+    });
+
+  auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+  verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Child process group can provide input for root processor through output port", "[YamlProcessGroupParser]") {
+  auto pattern = Group("root")
+    .With({Conn{"Conn1",
+                OutputPort{"00000000-0000-0000-0000-000000000002", "Port1"},
+                Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}}})
+    .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}})
+    .With({
+      Group("Child1")
+      .With({OutputPort{"00000000-0000-0000-0000-000000000002", "Port1"}})
+    });
+
+  auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+  verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Child process groups can communicate through ports", "[YamlProcessGroupParser]") {
+  auto pattern = Group("root")
+    .With({Conn{"Conn1",
+                OutputPort{"00000000-0000-0000-0000-000000000002", "Port1"},
+                InputPort{"00000000-0000-0000-0000-000000000003", "Port2"}}})
+    .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}})
+    .With({
+      Group("Child1")
+      .With({OutputPort{"00000000-0000-0000-0000-000000000002", "Port1"}}),
+      Group("Child2")
+      .With({InputPort{"00000000-0000-0000-0000-000000000003", "Port2"}})
+    });
+
+  auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+  verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Processor cannot communicate with child's nested process group", "[YamlProcessGroupParser]") {
+  Proc Proc1{"00000000-0000-0000-0000-000000000001", "Proc1"};
+  OutputPort Port1{"00000000-0000-0000-0000-000000000002", "Port1"};
+  InputPort Port2{"00000000-0000-0000-0000-000000000003", "Port2", ConnectionFailure::UNRESOLVED_DESTINATION};
+
+  auto pattern = Group("root")
+    .With({Conn{"Conn1",
+                Proc1,
+                Port2}})
+    .With({Proc1})
+    .With({
+      Group("Child1")
+      .With({Port1})
+      .With({Group("Child2")
+            .With({Port2})})
+    });
+
+  auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+  verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Input port can be a connection's source and the output port can be a destination inside the process group", "[YamlProcessGroupParser7]") {
+  auto pattern = Group("root")
+    .With({Conn{"Conn1",
+                InputPort{"00000000-0000-0000-0000-000000000001", "Port1"},
+                OutputPort{"00000000-0000-0000-0000-000000000002", "Port2"}}})
+    .With({InputPort{"00000000-0000-0000-0000-000000000001", "Port1"}})
+    .With({OutputPort{"00000000-0000-0000-0000-000000000002", "Port2"}});
+
+  auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+  verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Input port cannot be a connection's destination inside the process group", "[YamlProcessGroupParser]") {
+  auto pattern = Group("root")
+    .With({Conn{"Conn1",
+                Proc{"00000000-0000-0000-0000-000000000002", "Proc1"},
+                InputPort{"00000000-0000-0000-0000-000000000001", "Port1", ConnectionFailure::INPUT_CANNOT_BE_DESTINATION}}})
+    .With({InputPort{"00000000-0000-0000-0000-000000000001", "Port1"}})
+    .With({Proc{"00000000-0000-0000-0000-000000000002", "Proc1"}});
+
+  auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+  verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Output port cannot be a connection's source inside the process group", "[YamlProcessGroupParser]") {
+  auto pattern = Group("root")
+    .With({Conn{"Conn1",
+                OutputPort{"00000000-0000-0000-0000-000000000001", "Port1", ConnectionFailure::OUTPUT_CANNOT_BE_SOURCE},
+                Proc{"00000000-0000-0000-0000-000000000002", "Proc1"}}})
+    .With({OutputPort{"00000000-0000-0000-0000-000000000001", "Port1"}})
+    .With({Proc{"00000000-0000-0000-0000-000000000002", "Proc1"}});
+
+  auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+  verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Input port can be a connection's source and the output port can be a destination inside the process group through processor", "[YamlProcessGroupParser]") {
+  auto pattern = Group("root")
+    .With({Conn{"Conn1",
+                InputPort{"00000000-0000-0000-0000-000000000001", "Port1"},
+                Proc{"00000000-0000-0000-0000-000000000003", "Proc1"}},
+           Conn{"Conn2",
+                Proc{"00000000-0000-0000-0000-000000000003", "Proc1"},
+                OutputPort{"00000000-0000-0000-0000-000000000002", "Port2"}}})
+    .With({Proc{"00000000-0000-0000-0000-000000000003", "Proc1"}})
+    .With({InputPort{"00000000-0000-0000-0000-000000000001", "Port1"}})
+    .With({OutputPort{"00000000-0000-0000-0000-000000000002", "Port2"}});
+
+  auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+  verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Processor cannot set connection's destination to child process group's output port", "[YamlProcessGroupParser]") {
+  auto pattern = Group("root")
+    .With({Conn{"Conn1",
+                Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
+                OutputPort{"00000000-0000-0000-0000-000000000002", "Port1", ConnectionFailure::OUTPUT_CANNOT_BE_DESTINATION}}})
+    .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}})
+    .With({
+      Group("Child1")
+      .With({OutputPort{"00000000-0000-0000-0000-000000000002", "Port1"}})
+    });
+
+  auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+  verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Processor cannot set connection's source to child process group's input port", "[YamlProcessGroupParser]") {
+  auto pattern = Group("root")
+    .With({Conn{"Conn1",
+                InputPort{"00000000-0000-0000-0000-000000000002", "Port1", ConnectionFailure::INPUT_CANNOT_BE_SOURCE},
+                Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}}})
+    .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}})
+    .With({
+      Group("Child1")
+      .With({InputPort{"00000000-0000-0000-0000-000000000002", "Port1"}})
+    });
+
+  auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+  verifyProcessGroup(*root, pattern);
+}
diff --git a/libminifi/include/core/Funnel.h b/libminifi/include/ForwardingNode.h
similarity index 68%
rename from libminifi/include/core/Funnel.h
rename to libminifi/include/ForwardingNode.h
index 8c8e08c..d492d34 100644
--- a/libminifi/include/core/Funnel.h
+++ b/libminifi/include/ForwardingNode.h
@@ -17,35 +17,35 @@
  */
 #pragma once
 
-#include <memory>
 #include <string>
+#include <memory>
 #include <utility>
 
-#include "logging/LoggerFactory.h"
-#include "Processor.h"
+#include "core/logging/LoggerFactory.h"
+#include "core/Processor.h"
 
-namespace org::apache::nifi::minifi::core {
+namespace org::apache::nifi::minifi {
 
-class Funnel final : public Processor {
+class ForwardingNode : public core::Processor {
  public:
-  Funnel(std::string name, const utils::Identifier& uuid) : Processor(std::move(name), uuid), logger_(logging::LoggerFactory<Funnel>::getLogger()) {}
-  explicit Funnel(std::string name) : Processor(std::move(name)), logger_(logging::LoggerFactory<Funnel>::getLogger()) {}
+  ForwardingNode(std::string name, const utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger) : Processor(std::move(name), uuid), logger_(std::move(logger)) {
+    strategy_ = core::SchedulingStrategy::EVENT_DRIVEN;
+  }
+  ForwardingNode(std::string name, std::shared_ptr<core::logging::Logger> logger) : Processor(std::move(name)), logger_(std::move(logger)) {}
 
   static auto properties() { return std::array<core::Property, 0>{}; }
   MINIFIAPI static const core::Relationship Success;
   static auto relationships() { return std::array{Success}; }
   MINIFIAPI static constexpr bool SupportsDynamicProperties = false;
   MINIFIAPI static constexpr bool SupportsDynamicRelationships = false;
-  MINIFIAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
   MINIFIAPI static constexpr bool IsSingleThreaded = false;
-  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
   void initialize() override;
 
   void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
 
  private:
-  std::shared_ptr<logging::Logger> logger_;
+  std::shared_ptr<core::logging::Logger> logger_;
 };
 
-}  // namespace org::apache::nifi::minifi::core
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/Funnel.h b/libminifi/include/Funnel.h
new file mode 100644
index 0000000..e17689f
--- /dev/null
+++ b/libminifi/include/Funnel.h
@@ -0,0 +1,36 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <utility>
+
+#include "ForwardingNode.h"
+
+namespace org::apache::nifi::minifi {
+
+class Funnel final : public ForwardingNode {
+ public:
+  Funnel(std::string name, const utils::Identifier& uuid) : ForwardingNode(std::move(name), uuid, core::logging::LoggerFactory<Funnel>::getLogger()) {}
+  explicit Funnel(std::string name) : ForwardingNode(std::move(name), core::logging::LoggerFactory<Funnel>::getLogger()) {}
+
+  MINIFIAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+};
+
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/Port.h b/libminifi/include/Port.h
new file mode 100644
index 0000000..65c358b
--- /dev/null
+++ b/libminifi/include/Port.h
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <utility>
+
+#include "ForwardingNode.h"
+
+namespace org::apache::nifi::minifi {
+
+enum class PortType {
+  INPUT,
+  OUTPUT
+};
+
+class Port final : public ForwardingNode {
+ public:
+  Port(std::string name, const utils::Identifier& uuid, PortType port_type) : ForwardingNode(std::move(name), uuid, core::logging::LoggerFactory<Port>::getLogger()), port_type_(port_type) {}
+  Port(std::string name, PortType port_type) : ForwardingNode(std::move(name), core::logging::LoggerFactory<Port>::getLogger()), port_type_(port_type) {}
+  PortType getPortType() const {
+    return port_type_;
+  }
+
+  MINIFIAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED;
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+ private:
+  PortType port_type_;
+};
+
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index 76a0c92..96dd684 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -27,19 +27,21 @@
 #include <algorithm>
 #include <set>
 #include <utility>
+#include <tuple>
 
 #include "Processor.h"
-#include "Funnel.h"
 #include "Exception.h"
 #include "TimerDrivenSchedulingAgent.h"
 #include "EventDrivenSchedulingAgent.h"
 #include "CronDrivenSchedulingAgent.h"
+#include "Port.h"
 #include "core/logging/Logger.h"
 #include "controller/ControllerServiceNode.h"
 #include "controller/ControllerServiceMap.h"
 #include "utils/Id.h"
 #include "utils/BaseHTTPClient.h"
 #include "utils/CallBackTimer.h"
+#include "range/v3/algorithm/find_if.hpp"
 
 struct ProcessGroupTestAccessor;
 
@@ -164,18 +166,22 @@
     std::lock_guard<std::recursive_mutex> lock(mutex_);
     return parent_process_group_;
   }
-  // Add processor
-  void addProcessor(std::unique_ptr<Processor> processor);
-  // Add child processor group
+  [[maybe_unused]] std::tuple<Processor*, bool> addProcessor(std::unique_ptr<Processor> processor);
+  void addPort(std::unique_ptr<Port> port);
   void addProcessGroup(std::unique_ptr<ProcessGroup> child);
-  // ! Add connections
   void addConnection(std::unique_ptr<Connection> connection);
-  // Generic find
+  const std::set<Port*>& getPorts() const {
+    return ports_;
+  }
+
+  Port* findPortById(const utils::Identifier& uuid) const;
+  Port* findChildPortById(const utils::Identifier& uuid) const;
+
   template <typename Fun>
   Processor* findProcessor(Fun condition, Traverse traverse) const {
     std::lock_guard<std::recursive_mutex> lock(mutex_);
-    const auto found = std::find_if(processors_.cbegin(), processors_.cend(), condition);
-    if (found != processors_.cend()) {
+    const auto found = ranges::find_if(processors_, condition);
+    if (found != ranges::end(processors_)) {
       return found->get();
     }
     for (const auto& processGroup : child_process_groups_) {
@@ -231,9 +237,10 @@
   int config_version_;
   // Process Group Type
   const ProcessGroupType type_;
-  // Processors (ProcessNode) inside this process group which include Input/Output Port, Remote Process Group input/Output port
+  // Processors (ProcessNode) inside this process group which include Remote Process Group input/Output port
   std::set<std::unique_ptr<Processor>> processors_;
   std::set<Processor*> failed_processors_;
+  std::set<Port*> ports_;
   std::set<std::unique_ptr<ProcessGroup>> child_process_groups_;
   // Connections between the processor inside the group;
   std::set<std::unique_ptr<Connection>> connections_;
@@ -259,6 +266,8 @@
   core::controller::ControllerServiceMap controller_service_map_;
 
  private:
+  static Port* findPortById(const std::set<Port*>& ports, const utils::Identifier& uuid);
+
   // Mutex for protection
   mutable std::recursive_mutex mutex_;
   // Logger
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h
index 18ff1ee..bf7f280 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -15,8 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_CORE_YAML_YAMLCONFIGURATION_H_
-#define LIBMINIFI_INCLUDE_CORE_YAML_YAMLCONFIGURATION_H_
+#pragma once
 
 #include <memory>
 #include <optional>
@@ -37,11 +36,7 @@
 
 class YamlConfigurationTestAccessor;
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
+namespace org::apache::nifi::minifi::core {
 
 static constexpr char const* CONFIG_YAML_FLOW_CONTROLLER_KEY = "Flow Controller";
 static constexpr char const* CONFIG_YAML_PROCESSORS_KEY = "Processors";
@@ -50,6 +45,8 @@
 static constexpr char const* CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3 = "Remote Process Groups";
 static constexpr char const* CONFIG_YAML_PROVENANCE_REPORT_KEY = "Provenance Reporting";
 static constexpr char const* CONFIG_YAML_FUNNELS_KEY = "Funnels";
+static constexpr char const* CONFIG_YAML_INPUT_PORTS_KEY = "Input Ports";
+static constexpr char const* CONFIG_YAML_OUTPUT_PORTS_KEY = "Output Ports";
 
 #define YAML_CONFIGURATION_USE_REGEX
 
@@ -262,6 +259,17 @@
   void parseFunnelsYaml(const YAML::Node& node, core::ProcessGroup* parent);
 
   /**
+   * Parses the Input/Output Ports section of a configuration YAML.
+   * The resulting ports are added to the parent ProcessGroup.
+   *
+   * @param node   the YAML::Node containing the Input/Output Ports section
+   *                 of the configuration YAML
+   * @param parent the root node of flow configuration to which
+   *                 to add the funnels that are parsed
+   */
+  void parsePorts(const YAML::Node& node, core::ProcessGroup* parent, PortType port_type);
+
+  /**
    * A helper function for parsing or generating optional id fields.
    *
    * In parsing YAML flow configurations for config schema v1, the
@@ -320,10 +328,4 @@
   void raiseComponentError(const std::string &component_name, const std::string &yaml_section, const std::string &reason) const;
 };
 
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_CORE_YAML_YAMLCONFIGURATION_H_
+}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/Funnel.cpp b/libminifi/src/ForwardingNode.cpp
similarity index 72%
rename from libminifi/src/core/Funnel.cpp
rename to libminifi/src/ForwardingNode.cpp
index 5d31194..c1bf0a1 100644
--- a/libminifi/src/core/Funnel.cpp
+++ b/libminifi/src/ForwardingNode.cpp
@@ -15,18 +15,18 @@
  * limitations under the License.
  */
 
-#include "core/Funnel.h"
+#include "ForwardingNode.h"
 #include "core/ProcessSession.h"
 
-namespace org::apache::nifi::minifi::core {
+namespace org::apache::nifi::minifi {
 
-const Relationship Funnel::Success("success", "FlowFiles are routed to success relationship");
+const core::Relationship ForwardingNode::Success("success", "FlowFiles are routed to success relationship");
 
-void Funnel::initialize() {
+void ForwardingNode::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void Funnel::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession>& session) {
+void ForwardingNode::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession>& session) {
   logger_->log_trace("On trigger %s", getUUIDStr());
   std::shared_ptr<core::FlowFile> flow_file = session->get();
   if (!flow_file) {
@@ -35,4 +35,4 @@
   session->transfer(flow_file, Success);
 }
 
-}  // namespace org::apache::nifi::minifi::core
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index b7444ec..7922717 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -91,7 +91,7 @@
 }
 
 
-void ProcessGroup::addProcessor(std::unique_ptr<Processor> processor) {
+std::tuple<Processor*, bool> ProcessGroup::addProcessor(std::unique_ptr<Processor> processor) {
   gsl_Expects(processor);
   const auto name = processor->getName();
   std::lock_guard<std::recursive_mutex> lock(mutex_);
@@ -101,6 +101,15 @@
   } else {
     logger_->log_debug("Not adding processor %s into process group %s, as it is already there", name, name_);
   }
+  return std::make_tuple(iter->get(), inserted);
+}
+
+void ProcessGroup::addPort(std::unique_ptr<Port> port) {
+  auto [processor, inserted] = addProcessor(std::move(port));
+  if (inserted) {
+    std::lock_guard<std::recursive_mutex> lock(mutex_);
+    ports_.insert(static_cast<Port*>(processor));
+  }
 }
 
 void ProcessGroup::addProcessGroup(std::unique_ptr<ProcessGroup> child) {
@@ -323,6 +332,33 @@
   }
 }
 
+Port* ProcessGroup::findPortById(const std::set<Port*>& ports, const utils::Identifier& uuid) {
+  const auto found = ranges::find_if(ports, [&](auto port) {
+      utils::Identifier port_uuid = port->getUUID();
+      return port_uuid && uuid == port_uuid;
+    });
+  if (found != ranges::cend(ports)) {
+    return *found;
+  }
+  return nullptr;
+}
+
+Port* ProcessGroup::findPortById(const utils::Identifier& uuid) const {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  return findPortById(ports_, uuid);
+}
+
+Port* ProcessGroup::findChildPortById(const utils::Identifier& uuid) const {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  for (const auto& processGroup : child_process_groups_) {
+    const auto& ports = processGroup->getPorts();
+    if (auto port = findPortById(ports, uuid)) {
+      return port;
+    }
+  }
+  return nullptr;
+}
+
 void ProcessGroup::addConnection(std::unique_ptr<Connection> connection) {
   std::lock_guard<std::recursive_mutex> lock(mutex_);
 
@@ -334,19 +370,52 @@
   auto& insertedConnection = *insertPos;
 
   logger_->log_debug("Add connection %s into process group %s", insertedConnection->getName(), name_);
-  // only allow connections between processors of the same process group
-  auto source = this->findProcessorById(insertedConnection->getSourceUUID(), Traverse::ExcludeChildren);
+  // only allow connections between processors of the same process group or in/output ports of child process groups
+  // check input and output ports connection restrictions inside and outside a process group
+  Processor* source = findPortById(insertedConnection->getSourceUUID());
+  if (source && static_cast<Port*>(source)->getPortType() == PortType::OUTPUT) {
+    logger_->log_error("Output port [id = '%s'] cannot be a source inside the process group in the connection [name = '%s', id = '%s']",
+                       insertedConnection->getSourceUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
+    source = nullptr;
+  } else if (!source) {
+    source = findChildPortById(insertedConnection->getSourceUUID());
+    if (source && static_cast<Port*>(source)->getPortType() == PortType::INPUT) {
+      logger_->log_error("Input port [id = '%s'] cannot be a source outside the process group in the connection [name = '%s', id = '%s']",
+                          insertedConnection->getSourceUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
+      source = nullptr;
+    } else if (!source) {
+      source = findProcessorById(insertedConnection->getSourceUUID(), Traverse::ExcludeChildren);
+      if (!source) {
+        logger_->log_error("Cannot find the source processor with id '%s' for the connection [name = '%s', id = '%s']",
+                          insertedConnection->getSourceUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
+      }
+    }
+  }
+
   if (source) {
     source->addConnection(insertedConnection.get());
-  } else {
-    logger_->log_error("Cannot find the source processor with id '%s' for the connection [name = '%s', id = '%s']",
-                       insertedConnection->getSourceUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
   }
-  auto destination = this->findProcessorById(insertedConnection->getDestinationUUID(), Traverse::ExcludeChildren);
-  if (!destination) {
-    logger_->log_error("Cannot find the destination processor with id '%s' for the connection [name = '%s', id = '%s']",
+
+  Processor* destination = findPortById(insertedConnection->getDestinationUUID());
+  if (destination && static_cast<Port*>(destination)->getPortType() == PortType::INPUT) {
+    logger_->log_error("Input port [id = '%s'] cannot be a destination inside the process group in the connection [name = '%s', id = '%s']",
                        insertedConnection->getDestinationUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
+    destination = nullptr;
+  } else if (!destination) {
+    destination = findChildPortById(insertedConnection->getDestinationUUID());
+    if (destination && static_cast<Port*>(destination)->getPortType() == PortType::OUTPUT) {
+      logger_->log_error("Output port [id = '%s'] cannot be a destination outside the process group in the connection [name = '%s', id = '%s']",
+                          insertedConnection->getDestinationUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
+      destination = nullptr;
+    } else if (!destination) {
+      destination = findProcessorById(insertedConnection->getDestinationUUID(), Traverse::ExcludeChildren);
+      if (!destination) {
+        logger_->log_error("Cannot find the destination processor with id '%s' for the connection [name = '%s', id = '%s']",
+                          insertedConnection->getDestinationUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
+      }
+    }
   }
+
   if (destination && destination != source) {
     destination->addConnection(insertedConnection.get());
   }
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index e1f23f3..0560d85 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -27,6 +27,7 @@
 #include "core/state/Value.h"
 #include "Defaults.h"
 #include "utils/TimeUtil.h"
+#include "Funnel.h"
 
 #ifdef YAML_CONFIGURATION_USE_REGEX
 #include "utils/RegexUtils.h"
@@ -93,6 +94,8 @@
   YAML::Node processorsNode = yamlNode[CONFIG_YAML_PROCESSORS_KEY];
   YAML::Node connectionsNode = yamlNode[yaml::YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY];
   YAML::Node funnelsNode = yamlNode[CONFIG_YAML_FUNNELS_KEY];
+  YAML::Node inputPortsNode = yamlNode[CONFIG_YAML_INPUT_PORTS_KEY];
+  YAML::Node outputPortsNode = yamlNode[CONFIG_YAML_OUTPUT_PORTS_KEY];
   YAML::Node remoteProcessingGroupsNode = [&] {
     // assignment is not supported on invalid Yaml nodes
     YAML::Node candidate = yamlNode[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY];
@@ -106,9 +109,8 @@
   parseProcessorNodeYaml(processorsNode, group.get());
   parseRemoteProcessGroupYaml(remoteProcessingGroupsNode, group.get());
   parseFunnelsYaml(funnelsNode, group.get());
-  // parse connections last to give feedback if the source and/or destination
-  // is not in the same process group
-  parseConnectionYaml(connectionsNode, group.get());
+  parsePorts(inputPortsNode, group.get(), PortType::INPUT);
+  parsePorts(outputPortsNode, group.get(), PortType::OUTPUT);
 
   if (childProcessGroupNodeSeq && childProcessGroupNodeSeq.IsSequence()) {
     for (YAML::const_iterator it = childProcessGroupNodeSeq.begin(); it != childProcessGroupNodeSeq.end(); ++it) {
@@ -116,6 +118,10 @@
       group->addProcessGroup(parseProcessGroupYaml(childProcessGroupNode, childProcessGroupNode));
     }
   }
+
+  // parse connections last to give feedback if the source and/or destination processors
+  // is not in the same process group or input/output port connections are not allowed
+  parseConnectionYaml(connectionsNode, group.get());
   return group;
 }
 
@@ -773,7 +779,7 @@
       throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect funnel UUID format.");
     });
 
-    auto funnel = std::make_unique<core::Funnel>(name, uuid.value());
+    auto funnel = std::make_unique<Funnel>(name, uuid.value());
     logger_->log_debug("Created funnel with UUID %s and name %s", id, name);
     funnel->setScheduledState(core::RUNNING);
     funnel->setSchedulingStrategy(core::EVENT_DRIVEN);
@@ -781,6 +787,36 @@
   }
 }
 
+void YamlConfiguration::parsePorts(const YAML::Node& node, core::ProcessGroup* parent, PortType port_type) {
+  if (!parent) {
+    logger_->log_error("parsePorts: no parent group was provided");
+    return;
+  }
+  if (!node || !node.IsSequence()) {
+    return;
+  }
+
+  for (const auto& element : node) {
+    const auto port_node = element.as<YAML::Node>();
+
+    std::string id = getOrGenerateId(port_node);
+
+    // Default name to be same as ID
+    const auto name = port_node["name"].as<std::string>(id);
+
+    const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
+      logger_->log_debug("Incorrect port UUID format.");
+      throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect port UUID format.");
+    });
+
+    auto port = std::make_unique<Port>(name, uuid.value(), port_type);
+    logger_->log_debug("Created port UUID %s and name %s", id, name);
+    port->setScheduledState(core::RUNNING);
+    port->setSchedulingStrategy(core::EVENT_DRIVEN);
+    parent->addPort(std::move(port));
+  }
+}
+
 void YamlConfiguration::validateComponentProperties(ConfigurableComponent& component, const std::string &component_name, const std::string &yaml_section) const {
   const auto &component_properties = component.getProperties();
 
diff --git a/libminifi/src/core/yaml/YamlConnectionParser.cpp b/libminifi/src/core/yaml/YamlConnectionParser.cpp
index 5e246d8..9435be3 100644
--- a/libminifi/src/core/yaml/YamlConnectionParser.cpp
+++ b/libminifi/src/core/yaml/YamlConnectionParser.cpp
@@ -18,6 +18,7 @@
 
 #include "core/yaml/YamlConnectionParser.h"
 #include "core/yaml/CheckRequiredField.h"
+#include "Funnel.h"
 
 namespace org::apache::nifi::minifi::core::yaml {
 
@@ -41,8 +42,8 @@
   }
 
   auto& processor_ref = *processor;
-  if (typeid(minifi::core::Funnel) == typeid(processor_ref)) {
-    addNewRelationshipToConnection(minifi::core::Funnel::Success.getName(), connection);
+  if (typeid(minifi::Funnel) == typeid(processor_ref)) {
+    addNewRelationshipToConnection(minifi::Funnel::Success.getName(), connection);
   }
 }
 
diff --git a/libminifi/test/resources/TestProcessGroup.yml b/libminifi/test/resources/TestProcessGroup.yml
new file mode 100644
index 0000000..0b1ee75
--- /dev/null
+++ b/libminifi/test/resources/TestProcessGroup.yml
@@ -0,0 +1,69 @@
+MiNiFi Config Version: 3
+Flow Controller:
+  name: MiNiFi Flow
+Processors:
+- name: GenerateFlowFile
+  id: 4812b638-2f79-4dc2-9693-847a90399cbd
+  class: org.apache.nifi.minifi.processors.GenerateFlowFile
+  scheduling strategy: TIMER_DRIVEN
+  scheduling period: 100 ms
+  penalization period: 1000 ms
+  Properties:
+    Batch Size: '1'
+    Data Format: Binary
+    File Size: 10 B
+    Unique FlowFiles: 'true'
+- name: Log attributes
+  id: 2ecd4bb4-b103-43fe-a45d-6a79b12da79b
+  class: org.apache.nifi.minifi.processors.LogAttribute
+  scheduling strategy: EVENT_DRIVEN
+  auto-terminated relationships list:
+  - success
+  Properties:
+    FlowFiles To Log: '0'
+Connections:
+- name: GenerateFlowFile/success/ProcessGroup
+  id: 492bc370-5d4c-4657-952f-3d6093147ad8
+  source id: 4812b638-2f79-4dc2-9693-847a90399cbd
+  source relationship names:
+  - success
+  destination id: 012fc536-3137-4360-be65-3e3b47e05941
+- name: ProcessGroup/success/LogAttribute
+  id: 12656e8e-0b91-4694-a2b7-3aa147574cd2
+  source id: 46dd8c65-8255-4980-8b7e-4381da00867a
+  source relationship names:
+  - success
+  destination id: 2ecd4bb4-b103-43fe-a45d-6a79b12da79b
+Controller Services: []
+Remote Process Groups: []
+Process Groups:
+  - id: 0a3aaf32-8574-4fa7-b720-84001f8dd71a
+    name: Update the attributes
+    Processors:
+      - id: 11624e01-baca-4590-bb9d-512ae2616615
+        name: UpdateAttribute
+        class: org.apache.nifi.minifi.processors.UpdateAttribute
+        scheduling strategy: EVENT_DRIVEN
+        auto-terminated relationships list:
+        - failure
+        Properties:
+          test_attribute: success
+    Input Ports:
+      - id: 012fc536-3137-4360-be65-3e3b47e05941
+        name: in
+    Output Ports:
+      - id: 46dd8c65-8255-4980-8b7e-4381da00867a
+        name: out
+    Connections:
+      - name: Input/success/UpdateAttribute
+        id: 2d33779c-2305-4e1a-88b8-1d2b6a9b134c
+        source id: 012fc536-3137-4360-be65-3e3b47e05941
+        source relationship names:
+        - success
+        destination id: 11624e01-baca-4590-bb9d-512ae2616615
+      - name: UpdateAttribute/success/Output
+        id: 5af95cc8-455a-4d5e-afbb-e16699407ed2
+        source id: 11624e01-baca-4590-bb9d-512ae2616615
+        source relationship names:
+        - success
+        destination id: 46dd8c65-8255-4980-8b7e-4381da00867a