MINIFICPP-1962 Implement communication between process group through ports
Closes #1451
Signed-off-by: Marton Szasz <szaszm@apache.org>
diff --git a/extensions/standard-processors/tests/CMakeLists.txt b/extensions/standard-processors/tests/CMakeLists.txt
index 01e162d..784de18 100644
--- a/extensions/standard-processors/tests/CMakeLists.txt
+++ b/extensions/standard-processors/tests/CMakeLists.txt
@@ -97,8 +97,8 @@
endif()
add_test(NAME TailFileTest COMMAND TailFileTest "${TEST_RESOURCES}/TestTailFile.yml" "${TEST_RESOURCES}/")
-
add_test(NAME TailFileCronTest COMMAND TailFileTest "${TEST_RESOURCES}/TestTailFileCron.yml" "${TEST_RESOURCES}/")
+add_test(NAME ProcessGroupTest COMMAND ProcessGroupTest "${TEST_RESOURCES}/TestProcessGroup.yml")
FOREACH(resourcefile ${RESOURCE_APPS})
get_filename_component(resourcefilename "${resourcefile}" NAME_WE)
diff --git a/extensions/standard-processors/tests/integration/ProcessGroupTest.cpp b/extensions/standard-processors/tests/integration/ProcessGroupTest.cpp
new file mode 100644
index 0000000..5e83586
--- /dev/null
+++ b/extensions/standard-processors/tests/integration/ProcessGroupTest.cpp
@@ -0,0 +1,63 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <cassert>
+#include <string>
+
+#include "core/logging/Logger.h"
+#include "FlowController.h"
+#include "TestBase.h"
+#include "Catch.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/LogAttribute.h"
+#include "processors/UpdateAttribute.h"
+#include "integration/IntegrationBase.h"
+#include "utils/IntegrationTestUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+class ProcessGroupTestHarness : public IntegrationBase {
+ public:
+ ProcessGroupTestHarness() : IntegrationBase(2s) {
+ }
+
+ void testSetup() override {
+ LogTestController::getInstance().setInfo<minifi::processors::LogAttribute>();
+ LogTestController::getInstance().setTrace<minifi::processors::GenerateFlowFile>();
+ LogTestController::getInstance().setTrace<minifi::processors::UpdateAttribute>();
+ }
+
+ void runAssertions() override {
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_),
+ "key:test_attribute value:success"));
+ }
+};
+
+int main(int argc, char **argv) {
+ std::string test_file_location;
+ if (argc > 1) {
+ test_file_location = argv[1];
+ }
+
+ ProcessGroupTestHarness harness;
+ harness.run(test_file_location);
+
+ return 0;
+}
diff --git a/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h b/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h
index 58a0dac..bb9d167 100644
--- a/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h
+++ b/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h
@@ -60,9 +60,21 @@
}
};
+enum class ConnectionFailure {
+ UNRESOLVED_SOURCE,
+ UNRESOLVED_DESTINATION,
+ INPUT_CANNOT_BE_SOURCE,
+ OUTPUT_CANNOT_BE_DESTINATION,
+ INPUT_CANNOT_BE_DESTINATION,
+ OUTPUT_CANNOT_BE_SOURCE
+};
+
struct Proc {
+ Proc(std::string id, std::string name, const std::optional<ConnectionFailure>& failure = std::nullopt)
+ : id(std::move(id)), name(std::move(name)), failure(failure) {}
std::string id;
std::string name;
+ std::optional<ConnectionFailure> failure;
Lines serialize() const {
return {{
@@ -73,17 +85,33 @@
}
};
-struct UnresolvedProc {
- explicit UnresolvedProc(std::string id): id(std::move(id)) {}
+template<typename Tag>
+struct Port {
+ Port(std::string id, std::string name, const std::optional<ConnectionFailure>& failure = std::nullopt)
+ : id(std::move(id)), name(std::move(name)), failure(failure) {}
std::string id;
+ std::string name;
+ std::optional<ConnectionFailure> failure;
+
+ Lines serialize() const {
+ return {{
+ "- id: " + id,
+ " name: " + name
+ }};
+ }
};
+using InputPort = Port<struct InputTag>;
+using OutputPort = Port<struct OutputTag>;
+
struct MaybeProc {
- MaybeProc(const Proc& proc): id(proc.id), name(proc.name) {} // NOLINT
- MaybeProc(const UnresolvedProc& proc) : id(proc.id) {} // NOLINT
+ MaybeProc(const Proc& proc) : id(proc.id), name(proc.name), failure(proc.failure) {} // NOLINT(runtime/explicit)
+ MaybeProc(const InputPort& port) : id(port.id), name(port.name), failure(port.failure) {} // NOLINT(runtime/explicit)
+ MaybeProc(const OutputPort& port) : id(port.id), name(port.name), failure(port.failure) {} // NOLINT(runtime/explicit)
std::string id;
- std::optional<std::string> name;
+ std::string name;
+ std::optional<ConnectionFailure> failure;
};
struct Conn {
@@ -139,6 +167,14 @@
rpgs_ = std::move(rpgs);
return *this;
}
+ Group& With(std::vector<InputPort> input_ports) {
+ input_ports_ = std::move(input_ports);
+ return *this;
+ }
+ Group& With(std::vector<OutputPort> output_ports) {
+ output_ports_ = std::move(output_ports);
+ return *this;
+ }
Lines serialize(bool is_root = true) const {
Lines body;
if (processors_.empty()) {
@@ -169,6 +205,22 @@
body.append(subgroup.serialize(false).indentAll());
}
}
+ if (input_ports_.empty()) {
+ body.emplace_back("Input Ports: []");
+ } else {
+ body.emplace_back("Input Ports:");
+ for (const auto& port : input_ports_) {
+ body.append(port.serialize().indentAll());
+ }
+ }
+ if (output_ports_.empty()) {
+ body.emplace_back("Output Ports: []");
+ } else {
+ body.emplace_back("Output Ports:");
+ for (const auto& port : output_ports_) {
+ body.append(port.serialize().indentAll());
+ }
+ }
Lines lines;
if (is_root) {
lines.emplace_back("Flow Controller:");
@@ -186,12 +238,15 @@
std::vector<Proc> processors_;
std::vector<Group> subgroups_;
std::vector<RPG> rpgs_;
+ std::vector<InputPort> input_ports_;
+ std::vector<OutputPort> output_ports_;
};
struct ProcessGroupTestAccessor {
FIELD_ACCESSOR(processors_)
FIELD_ACCESSOR(connections_)
FIELD_ACCESSOR(child_process_groups_)
+ FIELD_ACCESSOR(ports_)
};
template<typename T, typename = void>
@@ -222,6 +277,54 @@
return nullptr;
}
+void assertFailure(const Conn& expected, ConnectionFailure failure) {
+ auto assertMessage = [](const std::string& message) {
+ REQUIRE(utils::verifyLogLinePresenceInPollTime(std::chrono::seconds{1}, message));
+ };
+
+ switch (failure) {
+ case ConnectionFailure::UNRESOLVED_DESTINATION: {
+ assertMessage("Cannot find the destination processor with id '" + expected.destination.id + "' for the connection [name = '" + expected.name + "'");
+ break;
+ }
+ case ConnectionFailure::UNRESOLVED_SOURCE: {
+ assertMessage("Cannot find the source processor with id '" + expected.source.id + "' for the connection [name = '" + expected.name + "'");
+ break;
+ }
+ case ConnectionFailure::INPUT_CANNOT_BE_SOURCE: {
+ assertMessage("Input port [id = '" + expected.source.id + "'] cannot be a source outside the process group in the connection [name = '" + expected.name + "'");
+ break;
+ }
+ case ConnectionFailure::OUTPUT_CANNOT_BE_DESTINATION: {
+ assertMessage("Output port [id = '" + expected.destination.id + "'] cannot be a destination outside the process group in the connection [name = '" + expected.name + "'");
+ break;
+ }
+ case ConnectionFailure::INPUT_CANNOT_BE_DESTINATION: {
+ assertMessage("Input port [id = '" + expected.destination.id + "'] cannot be a destination inside the process group in the connection [name = '" + expected.name + "'");
+ break;
+ }
+ case ConnectionFailure::OUTPUT_CANNOT_BE_SOURCE: {
+ assertMessage("Output port [id = '" + expected.source.id + "'] cannot be a source inside the process group in the connection [name = '" + expected.name + "'");
+ break;
+ }
+ }
+}
+
+void verifyConnectionNode(minifi::Connection* conn, const Conn& expected) {
+ if (expected.source.failure) {
+ REQUIRE(conn->getSource() == nullptr);
+ assertFailure(expected, *expected.source.failure);
+ } else {
+ REQUIRE(conn->getSource()->getName() == expected.source.name);
+ }
+ if (expected.destination.failure) {
+ REQUIRE(conn->getDestination() == nullptr);
+ assertFailure(expected, *expected.destination.failure);
+ } else {
+ REQUIRE(conn->getDestination()->getName() == expected.destination.name);
+ }
+}
+
void verifyProcessGroup(core::ProcessGroup& group, const Group& pattern) {
// verify name
REQUIRE(group.getName() == pattern.name_);
@@ -231,33 +334,34 @@
for (auto& expected : pattern.connections_) {
auto conn = findByName(connections, expected.name);
REQUIRE(conn);
- if (!expected.source.name) {
- REQUIRE(conn->getSource() == nullptr);
- REQUIRE(utils::verifyLogLinePresenceInPollTime(
- std::chrono::seconds{1},
- "Cannot find the source processor with id '" + expected.source.id
- + "' for the connection [name = '" + expected.name + "'"));
- } else {
- REQUIRE(conn->getSource()->getName() == expected.source.name);
- }
- if (!expected.destination.name) {
- REQUIRE(conn->getDestination() == nullptr);
- REQUIRE(utils::verifyLogLinePresenceInPollTime(
- std::chrono::seconds{1},
- "Cannot find the destination processor with id '" + expected.destination.id
- + "' for the connection [name = '" + expected.name + "'"));
- } else {
- REQUIRE(conn->getDestination()->getName() == expected.destination.name);
- }
+ verifyConnectionNode(conn, expected);
}
- // verify processors
+ // verify processors and ports
const auto& processors = ProcessGroupTestAccessor::get_processors_(group);
- REQUIRE(processors.size() == pattern.processors_.size());
+ REQUIRE(processors.size() == pattern.processors_.size() + pattern.input_ports_.size() + pattern.output_ports_.size());
for (auto& expected : pattern.processors_) {
REQUIRE(findByName(processors, expected.name));
}
+ for (auto& expected : pattern.input_ports_) {
+ REQUIRE(findByName(processors, expected.name));
+ }
+
+ for (auto& expected : pattern.output_ports_) {
+ REQUIRE(findByName(processors, expected.name));
+ }
+
+ const auto& ports = ProcessGroupTestAccessor::get_ports_(group);
+ REQUIRE(ports.size() == pattern.input_ports_.size() + pattern.output_ports_.size());
+ for (auto& expected : pattern.input_ports_) {
+ REQUIRE(findByName(ports, expected.name));
+ }
+
+ for (auto& expected : pattern.output_ports_) {
+ REQUIRE(findByName(ports, expected.name));
+ }
+
std::set<core::ProcessGroup*> simple_subgroups;
std::set<core::ProcessGroup*> rpg_subgroups;
for (auto& subgroup : ProcessGroupTestAccessor::get_child_process_groups_(group)) {
diff --git a/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp b/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp
index 52e264d..0f58795 100644
--- a/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp
@@ -24,7 +24,7 @@
static core::YamlConfiguration config(nullptr, nullptr, nullptr, nullptr, std::make_shared<minifi::Configure>());
-TEST_CASE("Root process group is correctly parsed", "[YamlProcessGroupParser1]") {
+TEST_CASE("Root process group is correctly parsed", "[YamlProcessGroupParser]") {
auto pattern = Group("root")
.With({
Conn{"Conn1",
@@ -48,7 +48,7 @@
verifyProcessGroup(*root, pattern);
}
-TEST_CASE("Nested process group is correctly parsed", "[YamlProcessGroupParser2]") {
+TEST_CASE("Nested process group is correctly parsed", "[YamlProcessGroupParser]") {
auto pattern = Group("root")
.With({Conn{"Conn1",
Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
@@ -71,7 +71,7 @@
verifyProcessGroup(*root, pattern);
}
-TEST_CASE("Cannot connect processors from different groups", "[YamlProcessGroupParser3]") {
+TEST_CASE("Cannot connect processors from different groups", "[YamlProcessGroupParser]") {
TestController controller;
LogTestController::getInstance().setTrace<core::YamlConfiguration>();
Proc Proc1{"00000000-0000-0000-0000-000000000001", "Proc1"};
@@ -102,18 +102,18 @@
}
SECTION("Connecting processors in their child/parent group") {
- Conn1.source = UnresolvedProc{Child1_Proc1.id};
- Conn1.destination = UnresolvedProc{Child1_Port1.id};
+ Conn1.source = Proc{Child1_Proc1.id, Child1_Proc1.name, ConnectionFailure::UNRESOLVED_SOURCE};
+ Conn1.destination = Proc{Child1_Port1.id, Child1_Port1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
- Child1_Conn1.source = UnresolvedProc{Proc1.id};
- Child1_Conn1.destination = UnresolvedProc{Port1.id};
+ Child1_Conn1.source = Proc{Proc1.id, Proc1.name, ConnectionFailure::UNRESOLVED_SOURCE};
+ Child1_Conn1.destination = Proc{Port1.id, Proc1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
}
SECTION("Connecting processors between their own and their child/parent group") {
Conn1.source = Proc1;
- Conn1.destination = UnresolvedProc{Child1_Port1.id};
+ Conn1.destination = Proc{Child1_Port1.id, Child1_Port1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
- Child1_Conn1.source = UnresolvedProc{Port1.id};
+ Child1_Conn1.source = Proc{Port1.id, Port1.name, ConnectionFailure::UNRESOLVED_SOURCE};
Child1_Conn1.destination = Child1_Proc1;
}
@@ -121,11 +121,171 @@
Conn1.source = Proc1;
Conn1.destination = Port1;
- Child1_Conn1.source = UnresolvedProc{Child2_Proc1.id};
- Child1_Conn1.destination = UnresolvedProc{Child2_Port1.id};
+ Child1_Conn1.source = Proc{Child2_Proc1.id, Child2_Proc1.name, ConnectionFailure::UNRESOLVED_SOURCE};
+ Child1_Conn1.destination = Proc{Child2_Port1.id, Child2_Port1.name, ConnectionFailure::UNRESOLVED_DESTINATION};
}
auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
verifyProcessGroup(*root, pattern);
}
+
+TEST_CASE("Processor can communicate with child process group's input port", "[YamlProcessGroupParser]") {
+ auto pattern = Group("root")
+ .With({Conn{"Conn1",
+ Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
+ InputPort{"00000000-0000-0000-0000-000000000002", "Port1"}}})
+ .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}})
+ .With({
+ Group("Child1")
+ .With({InputPort{"00000000-0000-0000-0000-000000000002", "Port1"}})
+ });
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Child process group can provide input for root processor through output port", "[YamlProcessGroupParser]") {
+ auto pattern = Group("root")
+ .With({Conn{"Conn1",
+ OutputPort{"00000000-0000-0000-0000-000000000002", "Port1"},
+ Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}}})
+ .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}})
+ .With({
+ Group("Child1")
+ .With({OutputPort{"00000000-0000-0000-0000-000000000002", "Port1"}})
+ });
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Child process groups can communicate through ports", "[YamlProcessGroupParser]") {
+ auto pattern = Group("root")
+ .With({Conn{"Conn1",
+ OutputPort{"00000000-0000-0000-0000-000000000002", "Port1"},
+ InputPort{"00000000-0000-0000-0000-000000000003", "Port2"}}})
+ .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}})
+ .With({
+ Group("Child1")
+ .With({OutputPort{"00000000-0000-0000-0000-000000000002", "Port1"}}),
+ Group("Child2")
+ .With({InputPort{"00000000-0000-0000-0000-000000000003", "Port2"}})
+ });
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Processor cannot communicate with child's nested process group", "[YamlProcessGroupParser]") {
+ Proc Proc1{"00000000-0000-0000-0000-000000000001", "Proc1"};
+ OutputPort Port1{"00000000-0000-0000-0000-000000000002", "Port1"};
+ InputPort Port2{"00000000-0000-0000-0000-000000000003", "Port2", ConnectionFailure::UNRESOLVED_DESTINATION};
+
+ auto pattern = Group("root")
+ .With({Conn{"Conn1",
+ Proc1,
+ Port2}})
+ .With({Proc1})
+ .With({
+ Group("Child1")
+ .With({Port1})
+ .With({Group("Child2")
+ .With({Port2})})
+ });
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Input port can be a connection's source and the output port can be a destination inside the process group", "[YamlProcessGroupParser7]") {
+ auto pattern = Group("root")
+ .With({Conn{"Conn1",
+ InputPort{"00000000-0000-0000-0000-000000000001", "Port1"},
+ OutputPort{"00000000-0000-0000-0000-000000000002", "Port2"}}})
+ .With({InputPort{"00000000-0000-0000-0000-000000000001", "Port1"}})
+ .With({OutputPort{"00000000-0000-0000-0000-000000000002", "Port2"}});
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Input port cannot be a connection's destination inside the process group", "[YamlProcessGroupParser]") {
+ auto pattern = Group("root")
+ .With({Conn{"Conn1",
+ Proc{"00000000-0000-0000-0000-000000000002", "Proc1"},
+ InputPort{"00000000-0000-0000-0000-000000000001", "Port1", ConnectionFailure::INPUT_CANNOT_BE_DESTINATION}}})
+ .With({InputPort{"00000000-0000-0000-0000-000000000001", "Port1"}})
+ .With({Proc{"00000000-0000-0000-0000-000000000002", "Proc1"}});
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Output port cannot be a connection's source inside the process group", "[YamlProcessGroupParser]") {
+ auto pattern = Group("root")
+ .With({Conn{"Conn1",
+ OutputPort{"00000000-0000-0000-0000-000000000001", "Port1", ConnectionFailure::OUTPUT_CANNOT_BE_SOURCE},
+ Proc{"00000000-0000-0000-0000-000000000002", "Proc1"}}})
+ .With({OutputPort{"00000000-0000-0000-0000-000000000001", "Port1"}})
+ .With({Proc{"00000000-0000-0000-0000-000000000002", "Proc1"}});
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Input port can be a connection's source and the output port can be a destination inside the process group through processor", "[YamlProcessGroupParser]") {
+ auto pattern = Group("root")
+ .With({Conn{"Conn1",
+ InputPort{"00000000-0000-0000-0000-000000000001", "Port1"},
+ Proc{"00000000-0000-0000-0000-000000000003", "Proc1"}},
+ Conn{"Conn2",
+ Proc{"00000000-0000-0000-0000-000000000003", "Proc1"},
+ OutputPort{"00000000-0000-0000-0000-000000000002", "Port2"}}})
+ .With({Proc{"00000000-0000-0000-0000-000000000003", "Proc1"}})
+ .With({InputPort{"00000000-0000-0000-0000-000000000001", "Port1"}})
+ .With({OutputPort{"00000000-0000-0000-0000-000000000002", "Port2"}});
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Processor cannot set connection's destination to child process group's output port", "[YamlProcessGroupParser]") {
+ auto pattern = Group("root")
+ .With({Conn{"Conn1",
+ Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
+ OutputPort{"00000000-0000-0000-0000-000000000002", "Port1", ConnectionFailure::OUTPUT_CANNOT_BE_DESTINATION}}})
+ .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}})
+ .With({
+ Group("Child1")
+ .With({OutputPort{"00000000-0000-0000-0000-000000000002", "Port1"}})
+ });
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Processor cannot set connection's source to child process group's input port", "[YamlProcessGroupParser]") {
+ auto pattern = Group("root")
+ .With({Conn{"Conn1",
+ InputPort{"00000000-0000-0000-0000-000000000002", "Port1", ConnectionFailure::INPUT_CANNOT_BE_SOURCE},
+ Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}}})
+ .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}})
+ .With({
+ Group("Child1")
+ .With({InputPort{"00000000-0000-0000-0000-000000000002", "Port1"}})
+ });
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
diff --git a/libminifi/include/core/Funnel.h b/libminifi/include/ForwardingNode.h
similarity index 68%
rename from libminifi/include/core/Funnel.h
rename to libminifi/include/ForwardingNode.h
index 8c8e08c..d492d34 100644
--- a/libminifi/include/core/Funnel.h
+++ b/libminifi/include/ForwardingNode.h
@@ -17,35 +17,35 @@
*/
#pragma once
-#include <memory>
#include <string>
+#include <memory>
#include <utility>
-#include "logging/LoggerFactory.h"
-#include "Processor.h"
+#include "core/logging/LoggerFactory.h"
+#include "core/Processor.h"
-namespace org::apache::nifi::minifi::core {
+namespace org::apache::nifi::minifi {
-class Funnel final : public Processor {
+class ForwardingNode : public core::Processor {
public:
- Funnel(std::string name, const utils::Identifier& uuid) : Processor(std::move(name), uuid), logger_(logging::LoggerFactory<Funnel>::getLogger()) {}
- explicit Funnel(std::string name) : Processor(std::move(name)), logger_(logging::LoggerFactory<Funnel>::getLogger()) {}
+ ForwardingNode(std::string name, const utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger) : Processor(std::move(name), uuid), logger_(std::move(logger)) {
+ strategy_ = core::SchedulingStrategy::EVENT_DRIVEN;
+ }
+ ForwardingNode(std::string name, std::shared_ptr<core::logging::Logger> logger) : Processor(std::move(name)), logger_(std::move(logger)) {}
static auto properties() { return std::array<core::Property, 0>{}; }
MINIFIAPI static const core::Relationship Success;
static auto relationships() { return std::array{Success}; }
MINIFIAPI static constexpr bool SupportsDynamicProperties = false;
MINIFIAPI static constexpr bool SupportsDynamicRelationships = false;
- MINIFIAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
MINIFIAPI static constexpr bool IsSingleThreaded = false;
- ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
void initialize() override;
void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
private:
- std::shared_ptr<logging::Logger> logger_;
+ std::shared_ptr<core::logging::Logger> logger_;
};
-} // namespace org::apache::nifi::minifi::core
+} // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/Funnel.h b/libminifi/include/Funnel.h
new file mode 100644
index 0000000..e17689f
--- /dev/null
+++ b/libminifi/include/Funnel.h
@@ -0,0 +1,36 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <utility>
+
+#include "ForwardingNode.h"
+
+namespace org::apache::nifi::minifi {
+
+class Funnel final : public ForwardingNode {
+ public:
+ Funnel(std::string name, const utils::Identifier& uuid) : ForwardingNode(std::move(name), uuid, core::logging::LoggerFactory<Funnel>::getLogger()) {}
+ explicit Funnel(std::string name) : ForwardingNode(std::move(name), core::logging::LoggerFactory<Funnel>::getLogger()) {}
+
+ MINIFIAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
+ ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+};
+
+} // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/Port.h b/libminifi/include/Port.h
new file mode 100644
index 0000000..65c358b
--- /dev/null
+++ b/libminifi/include/Port.h
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <utility>
+
+#include "ForwardingNode.h"
+
+namespace org::apache::nifi::minifi {
+
+enum class PortType {
+ INPUT,
+ OUTPUT
+};
+
+class Port final : public ForwardingNode {
+ public:
+ Port(std::string name, const utils::Identifier& uuid, PortType port_type) : ForwardingNode(std::move(name), uuid, core::logging::LoggerFactory<Port>::getLogger()), port_type_(port_type) {}
+ Port(std::string name, PortType port_type) : ForwardingNode(std::move(name), core::logging::LoggerFactory<Port>::getLogger()), port_type_(port_type) {}
+ PortType getPortType() const {
+ return port_type_;
+ }
+
+ MINIFIAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED;
+ ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+ private:
+ PortType port_type_;
+};
+
+} // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index 76a0c92..96dd684 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -27,19 +27,21 @@
#include <algorithm>
#include <set>
#include <utility>
+#include <tuple>
#include "Processor.h"
-#include "Funnel.h"
#include "Exception.h"
#include "TimerDrivenSchedulingAgent.h"
#include "EventDrivenSchedulingAgent.h"
#include "CronDrivenSchedulingAgent.h"
+#include "Port.h"
#include "core/logging/Logger.h"
#include "controller/ControllerServiceNode.h"
#include "controller/ControllerServiceMap.h"
#include "utils/Id.h"
#include "utils/BaseHTTPClient.h"
#include "utils/CallBackTimer.h"
+#include "range/v3/algorithm/find_if.hpp"
struct ProcessGroupTestAccessor;
@@ -164,18 +166,22 @@
std::lock_guard<std::recursive_mutex> lock(mutex_);
return parent_process_group_;
}
- // Add processor
- void addProcessor(std::unique_ptr<Processor> processor);
- // Add child processor group
+ [[maybe_unused]] std::tuple<Processor*, bool> addProcessor(std::unique_ptr<Processor> processor);
+ void addPort(std::unique_ptr<Port> port);
void addProcessGroup(std::unique_ptr<ProcessGroup> child);
- // ! Add connections
void addConnection(std::unique_ptr<Connection> connection);
- // Generic find
+ const std::set<Port*>& getPorts() const {
+ return ports_;
+ }
+
+ Port* findPortById(const utils::Identifier& uuid) const;
+ Port* findChildPortById(const utils::Identifier& uuid) const;
+
template <typename Fun>
Processor* findProcessor(Fun condition, Traverse traverse) const {
std::lock_guard<std::recursive_mutex> lock(mutex_);
- const auto found = std::find_if(processors_.cbegin(), processors_.cend(), condition);
- if (found != processors_.cend()) {
+ const auto found = ranges::find_if(processors_, condition);
+ if (found != ranges::end(processors_)) {
return found->get();
}
for (const auto& processGroup : child_process_groups_) {
@@ -231,9 +237,10 @@
int config_version_;
// Process Group Type
const ProcessGroupType type_;
- // Processors (ProcessNode) inside this process group which include Input/Output Port, Remote Process Group input/Output port
+ // Processors (ProcessNode) inside this process group which include Remote Process Group input/Output port
std::set<std::unique_ptr<Processor>> processors_;
std::set<Processor*> failed_processors_;
+ std::set<Port*> ports_;
std::set<std::unique_ptr<ProcessGroup>> child_process_groups_;
// Connections between the processor inside the group;
std::set<std::unique_ptr<Connection>> connections_;
@@ -259,6 +266,8 @@
core::controller::ControllerServiceMap controller_service_map_;
private:
+ static Port* findPortById(const std::set<Port*>& ports, const utils::Identifier& uuid);
+
// Mutex for protection
mutable std::recursive_mutex mutex_;
// Logger
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h
index 18ff1ee..bf7f280 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -15,8 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_CORE_YAML_YAMLCONFIGURATION_H_
-#define LIBMINIFI_INCLUDE_CORE_YAML_YAMLCONFIGURATION_H_
+#pragma once
#include <memory>
#include <optional>
@@ -37,11 +36,7 @@
class YamlConfigurationTestAccessor;
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
+namespace org::apache::nifi::minifi::core {
static constexpr char const* CONFIG_YAML_FLOW_CONTROLLER_KEY = "Flow Controller";
static constexpr char const* CONFIG_YAML_PROCESSORS_KEY = "Processors";
@@ -50,6 +45,8 @@
static constexpr char const* CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3 = "Remote Process Groups";
static constexpr char const* CONFIG_YAML_PROVENANCE_REPORT_KEY = "Provenance Reporting";
static constexpr char const* CONFIG_YAML_FUNNELS_KEY = "Funnels";
+static constexpr char const* CONFIG_YAML_INPUT_PORTS_KEY = "Input Ports";
+static constexpr char const* CONFIG_YAML_OUTPUT_PORTS_KEY = "Output Ports";
#define YAML_CONFIGURATION_USE_REGEX
@@ -262,6 +259,17 @@
void parseFunnelsYaml(const YAML::Node& node, core::ProcessGroup* parent);
/**
+ * Parses the Input/Output Ports section of a configuration YAML.
+ * The resulting ports are added to the parent ProcessGroup.
+ *
+ * @param node the YAML::Node containing the Input/Output Ports section
+ * of the configuration YAML
+ * @param parent the root node of flow configuration to which
+ * to add the funnels that are parsed
+ */
+ void parsePorts(const YAML::Node& node, core::ProcessGroup* parent, PortType port_type);
+
+ /**
* A helper function for parsing or generating optional id fields.
*
* In parsing YAML flow configurations for config schema v1, the
@@ -320,10 +328,4 @@
void raiseComponentError(const std::string &component_name, const std::string &yaml_section, const std::string &reason) const;
};
-} // namespace core
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
-
-#endif // LIBMINIFI_INCLUDE_CORE_YAML_YAMLCONFIGURATION_H_
+} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/Funnel.cpp b/libminifi/src/ForwardingNode.cpp
similarity index 72%
rename from libminifi/src/core/Funnel.cpp
rename to libminifi/src/ForwardingNode.cpp
index 5d31194..c1bf0a1 100644
--- a/libminifi/src/core/Funnel.cpp
+++ b/libminifi/src/ForwardingNode.cpp
@@ -15,18 +15,18 @@
* limitations under the License.
*/
-#include "core/Funnel.h"
+#include "ForwardingNode.h"
#include "core/ProcessSession.h"
-namespace org::apache::nifi::minifi::core {
+namespace org::apache::nifi::minifi {
-const Relationship Funnel::Success("success", "FlowFiles are routed to success relationship");
+const core::Relationship ForwardingNode::Success("success", "FlowFiles are routed to success relationship");
-void Funnel::initialize() {
+void ForwardingNode::initialize() {
setSupportedRelationships(relationships());
}
-void Funnel::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession>& session) {
+void ForwardingNode::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/, const std::shared_ptr<core::ProcessSession>& session) {
logger_->log_trace("On trigger %s", getUUIDStr());
std::shared_ptr<core::FlowFile> flow_file = session->get();
if (!flow_file) {
@@ -35,4 +35,4 @@
session->transfer(flow_file, Success);
}
-} // namespace org::apache::nifi::minifi::core
+} // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index b7444ec..7922717 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -91,7 +91,7 @@
}
-void ProcessGroup::addProcessor(std::unique_ptr<Processor> processor) {
+std::tuple<Processor*, bool> ProcessGroup::addProcessor(std::unique_ptr<Processor> processor) {
gsl_Expects(processor);
const auto name = processor->getName();
std::lock_guard<std::recursive_mutex> lock(mutex_);
@@ -101,6 +101,15 @@
} else {
logger_->log_debug("Not adding processor %s into process group %s, as it is already there", name, name_);
}
+ return std::make_tuple(iter->get(), inserted);
+}
+
+void ProcessGroup::addPort(std::unique_ptr<Port> port) {
+ auto [processor, inserted] = addProcessor(std::move(port));
+ if (inserted) {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ ports_.insert(static_cast<Port*>(processor));
+ }
}
void ProcessGroup::addProcessGroup(std::unique_ptr<ProcessGroup> child) {
@@ -323,6 +332,33 @@
}
}
+Port* ProcessGroup::findPortById(const std::set<Port*>& ports, const utils::Identifier& uuid) {
+ const auto found = ranges::find_if(ports, [&](auto port) {
+ utils::Identifier port_uuid = port->getUUID();
+ return port_uuid && uuid == port_uuid;
+ });
+ if (found != ranges::cend(ports)) {
+ return *found;
+ }
+ return nullptr;
+}
+
+Port* ProcessGroup::findPortById(const utils::Identifier& uuid) const {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ return findPortById(ports_, uuid);
+}
+
+Port* ProcessGroup::findChildPortById(const utils::Identifier& uuid) const {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ for (const auto& processGroup : child_process_groups_) {
+ const auto& ports = processGroup->getPorts();
+ if (auto port = findPortById(ports, uuid)) {
+ return port;
+ }
+ }
+ return nullptr;
+}
+
void ProcessGroup::addConnection(std::unique_ptr<Connection> connection) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
@@ -334,19 +370,52 @@
auto& insertedConnection = *insertPos;
logger_->log_debug("Add connection %s into process group %s", insertedConnection->getName(), name_);
- // only allow connections between processors of the same process group
- auto source = this->findProcessorById(insertedConnection->getSourceUUID(), Traverse::ExcludeChildren);
+ // only allow connections between processors of the same process group or in/output ports of child process groups
+ // check input and output ports connection restrictions inside and outside a process group
+ Processor* source = findPortById(insertedConnection->getSourceUUID());
+ if (source && static_cast<Port*>(source)->getPortType() == PortType::OUTPUT) {
+ logger_->log_error("Output port [id = '%s'] cannot be a source inside the process group in the connection [name = '%s', id = '%s']",
+ insertedConnection->getSourceUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
+ source = nullptr;
+ } else if (!source) {
+ source = findChildPortById(insertedConnection->getSourceUUID());
+ if (source && static_cast<Port*>(source)->getPortType() == PortType::INPUT) {
+ logger_->log_error("Input port [id = '%s'] cannot be a source outside the process group in the connection [name = '%s', id = '%s']",
+ insertedConnection->getSourceUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
+ source = nullptr;
+ } else if (!source) {
+ source = findProcessorById(insertedConnection->getSourceUUID(), Traverse::ExcludeChildren);
+ if (!source) {
+ logger_->log_error("Cannot find the source processor with id '%s' for the connection [name = '%s', id = '%s']",
+ insertedConnection->getSourceUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
+ }
+ }
+ }
+
if (source) {
source->addConnection(insertedConnection.get());
- } else {
- logger_->log_error("Cannot find the source processor with id '%s' for the connection [name = '%s', id = '%s']",
- insertedConnection->getSourceUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
}
- auto destination = this->findProcessorById(insertedConnection->getDestinationUUID(), Traverse::ExcludeChildren);
- if (!destination) {
- logger_->log_error("Cannot find the destination processor with id '%s' for the connection [name = '%s', id = '%s']",
+
+ Processor* destination = findPortById(insertedConnection->getDestinationUUID());
+ if (destination && static_cast<Port*>(destination)->getPortType() == PortType::INPUT) {
+ logger_->log_error("Input port [id = '%s'] cannot be a destination inside the process group in the connection [name = '%s', id = '%s']",
insertedConnection->getDestinationUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
+ destination = nullptr;
+ } else if (!destination) {
+ destination = findChildPortById(insertedConnection->getDestinationUUID());
+ if (destination && static_cast<Port*>(destination)->getPortType() == PortType::OUTPUT) {
+ logger_->log_error("Output port [id = '%s'] cannot be a destination outside the process group in the connection [name = '%s', id = '%s']",
+ insertedConnection->getDestinationUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
+ destination = nullptr;
+ } else if (!destination) {
+ destination = findProcessorById(insertedConnection->getDestinationUUID(), Traverse::ExcludeChildren);
+ if (!destination) {
+ logger_->log_error("Cannot find the destination processor with id '%s' for the connection [name = '%s', id = '%s']",
+ insertedConnection->getDestinationUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
+ }
+ }
}
+
if (destination && destination != source) {
destination->addConnection(insertedConnection.get());
}
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp
index e1f23f3..0560d85 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -27,6 +27,7 @@
#include "core/state/Value.h"
#include "Defaults.h"
#include "utils/TimeUtil.h"
+#include "Funnel.h"
#ifdef YAML_CONFIGURATION_USE_REGEX
#include "utils/RegexUtils.h"
@@ -93,6 +94,8 @@
YAML::Node processorsNode = yamlNode[CONFIG_YAML_PROCESSORS_KEY];
YAML::Node connectionsNode = yamlNode[yaml::YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY];
YAML::Node funnelsNode = yamlNode[CONFIG_YAML_FUNNELS_KEY];
+ YAML::Node inputPortsNode = yamlNode[CONFIG_YAML_INPUT_PORTS_KEY];
+ YAML::Node outputPortsNode = yamlNode[CONFIG_YAML_OUTPUT_PORTS_KEY];
YAML::Node remoteProcessingGroupsNode = [&] {
// assignment is not supported on invalid Yaml nodes
YAML::Node candidate = yamlNode[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY];
@@ -106,9 +109,8 @@
parseProcessorNodeYaml(processorsNode, group.get());
parseRemoteProcessGroupYaml(remoteProcessingGroupsNode, group.get());
parseFunnelsYaml(funnelsNode, group.get());
- // parse connections last to give feedback if the source and/or destination
- // is not in the same process group
- parseConnectionYaml(connectionsNode, group.get());
+ parsePorts(inputPortsNode, group.get(), PortType::INPUT);
+ parsePorts(outputPortsNode, group.get(), PortType::OUTPUT);
if (childProcessGroupNodeSeq && childProcessGroupNodeSeq.IsSequence()) {
for (YAML::const_iterator it = childProcessGroupNodeSeq.begin(); it != childProcessGroupNodeSeq.end(); ++it) {
@@ -116,6 +118,10 @@
group->addProcessGroup(parseProcessGroupYaml(childProcessGroupNode, childProcessGroupNode));
}
}
+
+ // parse connections last to give feedback if the source and/or destination processors
+ // is not in the same process group or input/output port connections are not allowed
+ parseConnectionYaml(connectionsNode, group.get());
return group;
}
@@ -773,7 +779,7 @@
throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect funnel UUID format.");
});
- auto funnel = std::make_unique<core::Funnel>(name, uuid.value());
+ auto funnel = std::make_unique<Funnel>(name, uuid.value());
logger_->log_debug("Created funnel with UUID %s and name %s", id, name);
funnel->setScheduledState(core::RUNNING);
funnel->setSchedulingStrategy(core::EVENT_DRIVEN);
@@ -781,6 +787,36 @@
}
}
+void YamlConfiguration::parsePorts(const YAML::Node& node, core::ProcessGroup* parent, PortType port_type) {
+ if (!parent) {
+ logger_->log_error("parsePorts: no parent group was provided");
+ return;
+ }
+ if (!node || !node.IsSequence()) {
+ return;
+ }
+
+ for (const auto& element : node) {
+ const auto port_node = element.as<YAML::Node>();
+
+ std::string id = getOrGenerateId(port_node);
+
+ // Default name to be same as ID
+ const auto name = port_node["name"].as<std::string>(id);
+
+ const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
+ logger_->log_debug("Incorrect port UUID format.");
+ throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect port UUID format.");
+ });
+
+ auto port = std::make_unique<Port>(name, uuid.value(), port_type);
+ logger_->log_debug("Created port UUID %s and name %s", id, name);
+ port->setScheduledState(core::RUNNING);
+ port->setSchedulingStrategy(core::EVENT_DRIVEN);
+ parent->addPort(std::move(port));
+ }
+}
+
void YamlConfiguration::validateComponentProperties(ConfigurableComponent& component, const std::string &component_name, const std::string &yaml_section) const {
const auto &component_properties = component.getProperties();
diff --git a/libminifi/src/core/yaml/YamlConnectionParser.cpp b/libminifi/src/core/yaml/YamlConnectionParser.cpp
index 5e246d8..9435be3 100644
--- a/libminifi/src/core/yaml/YamlConnectionParser.cpp
+++ b/libminifi/src/core/yaml/YamlConnectionParser.cpp
@@ -18,6 +18,7 @@
#include "core/yaml/YamlConnectionParser.h"
#include "core/yaml/CheckRequiredField.h"
+#include "Funnel.h"
namespace org::apache::nifi::minifi::core::yaml {
@@ -41,8 +42,8 @@
}
auto& processor_ref = *processor;
- if (typeid(minifi::core::Funnel) == typeid(processor_ref)) {
- addNewRelationshipToConnection(minifi::core::Funnel::Success.getName(), connection);
+ if (typeid(minifi::Funnel) == typeid(processor_ref)) {
+ addNewRelationshipToConnection(minifi::Funnel::Success.getName(), connection);
}
}
diff --git a/libminifi/test/resources/TestProcessGroup.yml b/libminifi/test/resources/TestProcessGroup.yml
new file mode 100644
index 0000000..0b1ee75
--- /dev/null
+++ b/libminifi/test/resources/TestProcessGroup.yml
@@ -0,0 +1,69 @@
+MiNiFi Config Version: 3
+Flow Controller:
+ name: MiNiFi Flow
+Processors:
+- name: GenerateFlowFile
+ id: 4812b638-2f79-4dc2-9693-847a90399cbd
+ class: org.apache.nifi.minifi.processors.GenerateFlowFile
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 100 ms
+ penalization period: 1000 ms
+ Properties:
+ Batch Size: '1'
+ Data Format: Binary
+ File Size: 10 B
+ Unique FlowFiles: 'true'
+- name: Log attributes
+ id: 2ecd4bb4-b103-43fe-a45d-6a79b12da79b
+ class: org.apache.nifi.minifi.processors.LogAttribute
+ scheduling strategy: EVENT_DRIVEN
+ auto-terminated relationships list:
+ - success
+ Properties:
+ FlowFiles To Log: '0'
+Connections:
+- name: GenerateFlowFile/success/ProcessGroup
+ id: 492bc370-5d4c-4657-952f-3d6093147ad8
+ source id: 4812b638-2f79-4dc2-9693-847a90399cbd
+ source relationship names:
+ - success
+ destination id: 012fc536-3137-4360-be65-3e3b47e05941
+- name: ProcessGroup/success/LogAttribute
+ id: 12656e8e-0b91-4694-a2b7-3aa147574cd2
+ source id: 46dd8c65-8255-4980-8b7e-4381da00867a
+ source relationship names:
+ - success
+ destination id: 2ecd4bb4-b103-43fe-a45d-6a79b12da79b
+Controller Services: []
+Remote Process Groups: []
+Process Groups:
+ - id: 0a3aaf32-8574-4fa7-b720-84001f8dd71a
+ name: Update the attributes
+ Processors:
+ - id: 11624e01-baca-4590-bb9d-512ae2616615
+ name: UpdateAttribute
+ class: org.apache.nifi.minifi.processors.UpdateAttribute
+ scheduling strategy: EVENT_DRIVEN
+ auto-terminated relationships list:
+ - failure
+ Properties:
+ test_attribute: success
+ Input Ports:
+ - id: 012fc536-3137-4360-be65-3e3b47e05941
+ name: in
+ Output Ports:
+ - id: 46dd8c65-8255-4980-8b7e-4381da00867a
+ name: out
+ Connections:
+ - name: Input/success/UpdateAttribute
+ id: 2d33779c-2305-4e1a-88b8-1d2b6a9b134c
+ source id: 012fc536-3137-4360-be65-3e3b47e05941
+ source relationship names:
+ - success
+ destination id: 11624e01-baca-4590-bb9d-512ae2616615
+ - name: UpdateAttribute/success/Output
+ id: 5af95cc8-455a-4d5e-afbb-e16699407ed2
+ source id: 11624e01-baca-4590-bb9d-512ae2616615
+ source relationship names:
+ - success
+ destination id: 46dd8c65-8255-4980-8b7e-4381da00867a