blob: 3884d8cc5b8f63fcd679ed0aa364e73f76133e13 [file]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "core/flow/StructuredConnectionParser.h"
#include "core/yaml/YamlConfiguration.h"
#include "TailFile.h"
#include "unit/TestBase.h"
#include "unit/Catch.h"
#include "unit/TestUtils.h"
#include "core/yaml/YamlNode.h"
#include "Connection.h"
using namespace std::literals::chrono_literals;
namespace {
using org::apache::nifi::minifi::core::flow::StructuredConnectionParser;
using org::apache::nifi::minifi::core::YamlConfiguration;
using RetryFlowFile = org::apache::nifi::minifi::processors::TailFile;
using org::apache::nifi::minifi::core::YamlNode;
namespace flow = org::apache::nifi::minifi::core::flow;
TEST_CASE("Connections components are parsed from yaml", "[YamlConfiguration]") {
const std::shared_ptr<logging::Logger> logger = logging::LoggerFactory<YamlConfiguration>::getLogger();
core::ProcessGroup parent(core::ProcessGroupType::ROOT_PROCESS_GROUP, "root");
gsl::not_null<core::ProcessGroup*> parent_ptr{ &parent };
SECTION("Source relationships are read") {
const auto connection = std::make_shared<minifi::ConnectionImpl>(nullptr, nullptr, "name");
std::string serialized_yaml;
std::set<org::apache::nifi::minifi::core::Relationship> expectations;
SECTION("Single relationship name") {
serialized_yaml = std::string { "source relationship name: success\n" };
expectations = { { "success", "" } };
}
SECTION("List of relationship names") {
serialized_yaml = std::string {
"source relationship names:\n"
"- success\n"
"- failure\n"
"- something_else\n" };
expectations = { { "success", "" }, { "failure", "" }, { "something_else", "" } };
}
YAML::Node yaml_node = YAML::Load(serialized_yaml);
flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
yaml_connection_parser.configureConnectionSourceRelationships(*connection);
const std::set<core::Relationship>& relationships = connection->getRelationships();
REQUIRE(expectations == relationships);
}
SECTION("Queue size limits are read") {
YAML::Node yaml_node = YAML::Load(std::string {
"max work queue size: 231\n"
"max work queue data size: 12 MB\n" });
flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
REQUIRE(231 == yaml_connection_parser.getWorkQueueSize());
REQUIRE(12_MiB == yaml_connection_parser.getWorkQueueDataSize());
}
SECTION("Queue swap threshold is read") {
YAML::Node yaml_node = YAML::Load(std::string {
"swap threshold: 231\n" });
flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
REQUIRE(231 == yaml_connection_parser.getSwapThreshold());
}
SECTION("Source and destination names and uuids are read") {
const utils::Identifier expected_source_id = [&] {
auto proc = minifi::test::utils::make_processor<minifi::processors::TailFile>("TailFile_1");
auto uuid = proc->getUUID();
parent.addProcessor(std::move(proc));
return uuid;
}();
const utils::Identifier expected_destination_id = [&] {
auto proc = minifi::test::utils::make_processor<minifi::processors::TailFile>("TailFile_2");
auto uuid = proc->getUUID();
parent.addProcessor(std::move(proc));
return uuid;
}();
std::string serialized_yaml;
SECTION("Directly from configuration") {
serialized_yaml = std::string {
"source id: " + expected_source_id.to_string() + "\n"
"destination id: " + expected_destination_id.to_string() + "\n" };
}
SECTION("Using UUID as remote processing group id") {
serialized_yaml = std::string {
"source name: " + expected_source_id.to_string() + "\n"
"destination name: " + expected_destination_id.to_string() + "\n" };
}
SECTION("Via processor name lookup") {
serialized_yaml = std::string {
"source name: TailFile_1\n"
"destination name: TailFile_2\n" };
}
YAML::Node yaml_node = YAML::Load(serialized_yaml);
flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
REQUIRE(expected_source_id == yaml_connection_parser.getSourceUUID());
REQUIRE(expected_destination_id == yaml_connection_parser.getDestinationUUID());
}
SECTION("Flow file expiration is read") {
YAML::Node yaml_node = YAML::Load(std::string {
"flowfile expiration: 2 min\n" });
flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
REQUIRE(2min == yaml_connection_parser.getFlowFileExpiration());
}
SECTION("Drop empty value is read") {
SECTION("When config contains true value") {
YAML::Node yaml_node = YAML::Load(std::string {
"drop empty: true\n" });
flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
REQUIRE(true == yaml_connection_parser.getDropEmpty());
}
SECTION("When config contains false value") {
YAML::Node yaml_node = YAML::Load(std::string {
"drop empty: false\n" });
flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
REQUIRE(false == yaml_connection_parser.getDropEmpty());
}
}
SECTION("Errors are handled properly when configuration lines are missing") {
const auto connection = std::make_shared<minifi::ConnectionImpl>(nullptr, nullptr, "name");
SECTION("With empty configuration") {
YAML::Node yaml_node = YAML::Load(std::string(""));
flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
CHECK_THROWS(StructuredConnectionParser(connection_node, "test_node", parent_ptr, logger));
}
SECTION("With a configuration that lists keys but has no assigned values") {
SECTION("Single relationship name left empty") {
YAML::Node yaml_node = YAML::Load(std::string {
"source name: \n"
"destination name: \n" });
flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
// This seems incorrect, but we do not want to ruin backward compatibility
CHECK_NOTHROW(yaml_connection_parser.configureConnectionSourceRelationships(*connection));
}
SECTION("List of relationship names contains empty item") {
YAML::Node yaml_node = YAML::Load(std::string {
"source relationship names:\n"
"- \n" });
flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
CHECK_NOTHROW(yaml_connection_parser.configureConnectionSourceRelationships(*connection));
}
SECTION("Source and destination lookup from via id") {
YAML::Node yaml_node = YAML::Load(std::string {
"source id: \n"
"destination id: \n" });
flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
CHECK_THROWS(yaml_connection_parser.getSourceUUID());
CHECK_THROWS(yaml_connection_parser.getDestinationUUID());
}
SECTION("Source and destination lookup via name") {
YAML::Node yaml_node = YAML::Load(std::string {
"source name: \n"
"destination name: \n" });
flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
CHECK_THROWS(yaml_connection_parser.getSourceUUID());
CHECK_THROWS(yaml_connection_parser.getDestinationUUID());
}
SECTION("Queue limits and configuration") {
YAML::Node yaml_node = YAML::Load(std::string {
"max work queue size: \n"
"max work queue data size: \n"
"swap threshold: \n"
"flowfile expiration: \n"
"drop empty: \n"});
flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
CHECK(minifi::Connection::DEFAULT_BACKPRESSURE_THRESHOLD_COUNT == yaml_connection_parser.getWorkQueueSize());
CHECK(minifi::Connection::DEFAULT_BACKPRESSURE_THRESHOLD_DATA_SIZE == yaml_connection_parser.getWorkQueueDataSize());
CHECK(0 == yaml_connection_parser.getSwapThreshold());
CHECK(0s == yaml_connection_parser.getFlowFileExpiration());
CHECK_FALSE(yaml_connection_parser.getDropEmpty());
}
}
SECTION("With a configuration that has values of incorrect format") {
YAML::Node yaml_node = YAML::Load(std::string {
"max work queue size: 2 KB\n"
"max work queue data size: 10 Incorrect\n"
"flowfile expiration: 12\n"
"drop empty: sup\n"});
flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
// This seems incorrect, but we do not want to ruin backward compatibility
CHECK_NOTHROW(yaml_connection_parser.getWorkQueueSize());
CHECK_NOTHROW(yaml_connection_parser.getWorkQueueDataSize());
CHECK_NOTHROW(yaml_connection_parser.getFlowFileExpiration());
CHECK_NOTHROW(yaml_connection_parser.getDropEmpty());
}
SECTION("Known incorrect formats that behave strangely") {
YAML::Node yaml_node = YAML::Load(std::string {
"max work queue data size: 2 Baby Pandas (img, 20 MB) that are cared for by a group of 30 giraffes\n"
"flowfile expiration: 0\n"
"drop empty: NULL\n"});
flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)};
StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger);
CHECK(minifi::Connection::DEFAULT_BACKPRESSURE_THRESHOLD_DATA_SIZE == yaml_connection_parser.getWorkQueueDataSize());
CHECK(0s == yaml_connection_parser.getFlowFileExpiration());
CHECK_FALSE(yaml_connection_parser.getDropEmpty());
}
}
}
} // namespace