blob: 30bdd9b383b378c8611a148acdcc92b7092fb60c [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <map>
#include <memory>
#include <chrono>
#include "core/repository/VolatileContentRepository.h"
#include "core/ProcessGroup.h"
#include "core/RepositoryFactory.h"
#include "core/yaml/YamlConfiguration.h"
#include "TailFile.h"
#include "TestBase.h"
#include "Catch.h"
#include "utils/TestUtils.h"
#include "utils/StringUtils.h"
using namespace std::literals::chrono_literals;
TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") {
TestController test_controller;
std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository");
std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
SECTION("loading YAML without optional component IDs works") {
static const std::string CONFIG_YAML_WITHOUT_IDS =
R"(
MiNiFi Config Version: 1
Flow Controller:
name: MiNiFi Flow
comment:
Core Properties:
flow controller graceful shutdown period: 10 sec
flow service write delay interval: 500 ms
administrative yield duration: 30 sec
bored yield duration: 10 millis
FlowFile Repository:
partitions: 256
checkpoint interval: 2 mins
always sync: false
Swap:
threshold: 20000
in period: 5 sec
in threads: 1
out period: 5 sec
out threads: 4
Provenance Repository:
provenance rollover time: 1 min
Content Repository:
content claim max appendable size: 10 MB
content claim max flow files: 100
always sync: false
Component Status Repository:
buffer size: 1440
snapshot frequency: 1 min
Security Properties:
keystore: /tmp/ssl/localhost-ks.jks
keystore type: JKS
keystore password: localtest
key password: localtest
truststore: /tmp/ssl/localhost-ts.jks
truststore type: JKS
truststore password: localtest
ssl protocol: TLS
Sensitive Props:
key:
algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
provider: BC
Processors:
- name: TailFile
class: org.apache.nifi.processors.standard.TailFile
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
scheduling period: 1 sec
penalization period: 30 sec
yield period: 1 sec
run duration nanos: 0
auto-terminated relationships list:
Properties:
File to Tail: logs/minifi-app.log
Rolling Filename Pattern: minifi-app*
Initial Start Position: Beginning of File
Connections:
- name: TailToS2S
source name: TailFile
source relationship name: success
destination name: 8644cbcc-a45c-40e0-964d-5e536e2ada61
max work queue size: 0
max work queue data size: 1 MB
flowfile expiration: 60 sec
queue prioritizer class: org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer
Remote Processing Groups:
- name: NiFi Flow
comment:
url: https://localhost:8090/nifi
timeout: 30 secs
yield period: 10 sec
Input Ports:
- id: 8644cbcc-a45c-40e0-964d-5e536e2ada61
name: tailed log
comments:
max concurrent tasks: 1
use compression: false
Provenance Reporting:
comment:
scheduling strategy: TIMER_DRIVEN
scheduling period: 30 sec
host: localhost
port name: provenance
port: 8090
port uuid: 2f389b8d-83f2-48d3-b465-048f28a1cb56
url: https://localhost:8090/
originating url: http://${hostname(true)}:8081/nifi
use compression: true
timeout: 30 secs
batch size: 1000;
)";
std::istringstream configYamlStream(CONFIG_YAML_WITHOUT_IDS);
std::unique_ptr<core::ProcessGroup> rootFlowConfig = yamlConfig.getYamlRoot(configYamlStream);
REQUIRE(rootFlowConfig);
REQUIRE(rootFlowConfig->findProcessorByName("TailFile"));
utils::Identifier uuid = rootFlowConfig->findProcessorByName("TailFile")->getUUID();
REQUIRE(uuid);
REQUIRE(!rootFlowConfig->findProcessorByName("TailFile")->getUUIDStr().empty());
REQUIRE(1 == rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks());
REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingStrategy());
REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriodNano());
REQUIRE(30s == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriod());
REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriodMsec());
REQUIRE(0s == rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano());
std::map<std::string, minifi::Connection*> connectionMap;
rootFlowConfig->getConnections(connectionMap);
REQUIRE(2 == connectionMap.size());
// This is a map of UUID->Connection, and we don't know UUID, so just going to loop over it
for (auto it : connectionMap) {
REQUIRE(it.second);
REQUIRE(!it.second->getUUIDStr().empty());
REQUIRE(it.second->getDestination());
REQUIRE(it.second->getSource());
REQUIRE(60s == it.second->getFlowExpirationDuration());
}
}
SECTION("missing required field in YAML throws exception") {
static const std::string CONFIG_YAML_NO_RPG_PORT_ID =
R"(
MiNiFi Config Version: 1
Flow Controller:
name: MiNiFi Flow
Processors: []
Connections: []
Remote Processing Groups:
- name: NiFi Flow
comment:
url: https://localhost:8090/nifi
timeout: 30 secs
yield period: 10 sec
Input Ports:
- name: tailed log
comments:
max concurrent tasks: 1
use compression: false
)";
std::istringstream configYamlStream(CONFIG_YAML_NO_RPG_PORT_ID);
REQUIRE_THROWS_AS(yamlConfig.getYamlRoot(configYamlStream), std::invalid_argument);
}
SECTION("Validated YAML property failure throws exception and logs invalid attribute name") {
static const std::string CONFIG_YAML_EMPTY_RETRY_ATTRIBUTE =
R"(
Flow Controller:
name: MiNiFi Flow
Processors:
- name: RetryFlowFile
class: org.apache.nifi.processors.standard.RetryFlowFile
scheduling strategy: TIMER_DRIVEN
scheduling period: 1 sec
auto-terminated relationships list:
Properties:
Retry Attribute: ""
Connections: []
Remote Processing Groups: []
Provenance Reporting:
)";
std::istringstream configYamlStream(CONFIG_YAML_EMPTY_RETRY_ATTRIBUTE);
REQUIRE_THROWS_AS(yamlConfig.getYamlRoot(configYamlStream), utils::internal::InvalidValueException);
REQUIRE(LogTestController::getInstance().contains("Invalid value was set for property 'Retry Attribute' creating component 'RetryFlowFile'"));
}
}
TEST_CASE("Test YAML v3 Invalid Type", "[YamlConfiguration3]") {
TestController test_controller;
std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository");
std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
static const std::string TEST_CONFIG_YAML =
R"(
MiNiFi Config Version: 3
Flow Controller:
name: Simple TailFile To RPG
comment: ''
Core Properties:
flow controller graceful shutdown period: 10 sec
flow service write delay interval: 500 ms
administrative yield duration: 30 sec
bored yield duration: 10 millis
max concurrent threads: 1
variable registry properties: ''
FlowFile Repository:
partitions: 256
checkpoint interval: 2 mins
always sync: false
Swap:
threshold: 20000
in period: 5 sec
in threads: 1
out period: 5 sec
out threads: 4
Content Repository:
content claim max appendable size: 10 MB
content claim max flow files: 100
always sync: false
Provenance Repository:
provenance rollover time: 1 min
implementation: org.apache.nifi.provenance.MiNiFiPersistentProvenanceRepository
Component Status Repository:
buffer size: 1440
snapshot frequency: 1 min
Security Properties:
keystore: ''
keystore type: ''
keystore password: ''
key password: ''
truststore: ''
truststore type: ''
truststore password: ''
ssl protocol: ''
Sensitive Props:
key:
algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
provider: BC
Processors:
- id: b0c04f28-0158-1000-0000-000000000000
name: TailFile
class: org.apache.nifi.processors.standard.TailFile
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
scheduling period: 1 sec
penalization period: 30 sec
yield period: 1 sec
run duration nanos: 0
auto-terminated relationships list: []
Properties:
File Location: Local
File to Tail: ./logs/minifi-app.log
Initial Start Position: Beginning of File
Rolling Filename Pattern:
tail-base-directory:
tail-mode: Single file
tailfile-lookup-frequency: 10 minutes
tailfile-maximum-age: 24 hours
tailfile-recursive-lookup: 'false'
tailfile-rolling-strategy: Fixed name
Controller Services: []
Process Groups: []
Input Ports: []
Output Ports: []
Funnels: []
Connections:
- id: b0c0c3cc-0158-1000-0000-000000000000
name: TailFile/success/ac0e798c-0158-1000-0588-cda9b944e011
source id: b0c04f28-0158-1000-0000-000000000000
source relationship names:
- success
destination id: ac0e798c-0158-1000-0588-cda9b944e011
max work queue size: 10000
max work queue data size: 1 GB
flowfile expiration: 0 sec
queue prioritizer class: ''
Remote Process Groups:
- id: b0c09ff0-0158-1000-0000-000000000000
name: ''
url: http://localhost:8080/nifi
comment: ''
timeout: 30 sec
yield period: 10 sec
transport protocol: WRONG
proxy host: ''
proxy port: ''
proxy user: ''
proxy password: ''
local network interface: ''
Input Ports:
- id: aca664f8-0158-1000-a139-92485891d349
name: test2
comment: ''
max concurrent tasks: 1
use compression: false
- id: ac0e798c-0158-1000-0588-cda9b944e011
name: test
comment: ''
max concurrent tasks: 1
use compression: false
Output Ports: []
NiFi Properties Overrides: {}
)";
std::istringstream configYamlStream(TEST_CONFIG_YAML);
REQUIRE_THROWS_AS(yamlConfig.getYamlRoot(configYamlStream), minifi::Exception);
}
TEST_CASE("Test YAML v3 Config Processing", "[YamlConfiguration3]") {
TestController test_controller;
std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository");
std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
static const std::string TEST_CONFIG_YAML =
R"(
MiNiFi Config Version: 3
Flow Controller:
name: Simple TailFile To RPG
comment: ''
Core Properties:
flow controller graceful shutdown period: 10 sec
flow service write delay interval: 500 ms
administrative yield duration: 30 sec
bored yield duration: 10 millis
max concurrent threads: 1
variable registry properties: ''
FlowFile Repository:
partitions: 256
checkpoint interval: 2 mins
always sync: false
Swap:
threshold: 20000
in period: 5 sec
in threads: 1
out period: 5 sec
out threads: 4
Content Repository:
content claim max appendable size: 10 MB
content claim max flow files: 100
always sync: false
Provenance Repository:
provenance rollover time: 1 min
implementation: org.apache.nifi.provenance.MiNiFiPersistentProvenanceRepository
Component Status Repository:
buffer size: 1440
snapshot frequency: 1 min
Security Properties:
keystore: ''
keystore type: ''
keystore password: ''
key password: ''
truststore: ''
truststore type: ''
truststore password: ''
ssl protocol: ''
Sensitive Props:
key:
algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL
provider: BC
Processors:
- id: b0c04f28-0158-1000-0000-000000000000
name: TailFile
class: org.apache.nifi.processors.standard.TailFile
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
scheduling period: 1 sec
penalization period: 30 sec
yield period: 1 sec
run duration nanos: 0
auto-terminated relationships list: []
Properties:
File Location: Local
File to Tail: ./logs/minifi-app.log
Initial Start Position: Beginning of File
Rolling Filename Pattern:
tail-base-directory:
tail-mode: Single file
tailfile-lookup-frequency: 10 minutes
tailfile-maximum-age: 24 hours
tailfile-recursive-lookup: 'false'
tailfile-rolling-strategy: Fixed name
Controller Services: []
Process Groups: []
Input Ports: []
Output Ports: []
Funnels: []
Connections:
- id: b0c0c3cc-0158-1000-0000-000000000000
name: TailFile/success/ac0e798c-0158-1000-0588-cda9b944e011
source id: b0c04f28-0158-1000-0000-000000000000
source relationship names:
- success
destination id: ac0e798c-0158-1000-0588-cda9b944e011
max work queue size: 10000
max work queue data size: 1 GB
flowfile expiration: 0 sec
queue prioritizer class: ''
Remote Process Groups:
- id: b0c09ff0-0158-1000-0000-000000000000
name: ''
url: http://localhost:8080/nifi
comment: ''
timeout: 30 sec
yield period: 10 sec
transport protocol: RAW
proxy host: ''
proxy port: ''
proxy user: ''
proxy password: ''
local network interface: ''
Input Ports:
- id: aca664f8-0158-1000-a139-92485891d349
name: test2
comment: ''
max concurrent tasks: 1
use compression: false
- id: ac0e798c-0158-1000-0588-cda9b944e011
name: test
comment: ''
max concurrent tasks: 1
use compression: false
Output Ports: []
NiFi Properties Overrides: {}
)";
std::istringstream configYamlStream(TEST_CONFIG_YAML);
std::unique_ptr<core::ProcessGroup> rootFlowConfig = yamlConfig.getYamlRoot(configYamlStream);
REQUIRE(rootFlowConfig);
REQUIRE(rootFlowConfig->findProcessorByName("TailFile"));
utils::Identifier uuid = rootFlowConfig->findProcessorByName("TailFile")->getUUID();
REQUIRE(uuid);
REQUIRE(!rootFlowConfig->findProcessorByName("TailFile")->getUUIDStr().empty());
REQUIRE(1 == rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks());
REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingStrategy());
REQUIRE(1 == rootFlowConfig->findProcessorByName("TailFile")->getMaxConcurrentTasks());
REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriodNano());
REQUIRE(30s == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriod());
REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriodMsec());
REQUIRE(0s == rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano());
std::map<std::string, minifi::Connection*> connectionMap;
rootFlowConfig->getConnections(connectionMap);
REQUIRE(2 == connectionMap.size());
for (auto it : connectionMap) {
REQUIRE(it.second);
REQUIRE(!it.second->getUUIDStr().empty());
REQUIRE(it.second->getDestination());
REQUIRE(it.second->getSource());
REQUIRE(0s == it.second->getFlowExpirationDuration());
}
}
TEST_CASE("Test Dynamic Unsupported", "[YamlConfigurationDynamicUnsupported]") {
TestController test_controller;
LogTestController &logTestController = LogTestController::getInstance();
logTestController.setDebug<TestPlan>();
logTestController.setTrace<core::YamlConfiguration>();
std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository");
std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
static const std::string TEST_CONFIG_YAML = R"(
Flow Controller:
name: Simple
Processors:
- name: PutFile
class: PutFile
Properties:
Dynamic Property: Bad
)";
std::istringstream configYamlStream(TEST_CONFIG_YAML);
std::unique_ptr<core::ProcessGroup> rootFlowConfig = yamlConfig.getYamlRoot(configYamlStream);
REQUIRE(rootFlowConfig);
REQUIRE(rootFlowConfig->findProcessorByName("PutFile"));
const utils::Identifier uuid = rootFlowConfig->findProcessorByName("PutFile")->getUUID();
REQUIRE(uuid);
REQUIRE(!rootFlowConfig->findProcessorByName("PutFile")->getUUIDStr().empty());
REQUIRE(LogTestController::getInstance().contains("[warning] Unable to set the dynamic property "
"Dynamic Property with value Bad"));
}
TEST_CASE("Test Required Property", "[YamlConfigurationRequiredProperty]") {
TestController test_controller;
LogTestController &logTestController = LogTestController::getInstance();
logTestController.setDebug<TestPlan>();
logTestController.setDebug<core::YamlConfiguration>();
std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository");
std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
static const std::string TEST_CONFIG_YAML = R"(
Flow Controller:
name: Simple
Processors:
- name: XYZ
class: GetFile
Properties:
Input Directory: ""
Batch Size: 1
)";
std::istringstream configYamlStream(TEST_CONFIG_YAML);
bool caught_exception = false;
try {
std::unique_ptr<core::ProcessGroup> rootFlowConfig = yamlConfig.getYamlRoot(configYamlStream);
REQUIRE(rootFlowConfig);
REQUIRE(rootFlowConfig->findProcessorByName("GetFile"));
utils::Identifier uuid = rootFlowConfig->findProcessorByName("GetFile")->getUUID();
REQUIRE(uuid);
REQUIRE(!rootFlowConfig->findProcessorByName("GetFile")->getUUIDStr().empty());
} catch (const std::exception &e) {
caught_exception = true;
REQUIRE("Unable to parse configuration file for component named 'XYZ' because required property "
"'Input Directory' is not set [in 'Processors' section of configuration file]" == std::string(e.what()));
}
REQUIRE(caught_exception);
}
TEST_CASE("Test Required Property 2", "[YamlConfigurationRequiredProperty2]") {
TestController test_controller;
LogTestController &logTestController = LogTestController::getInstance();
logTestController.setDebug<TestPlan>();
logTestController.setDebug<core::YamlConfiguration>();
logTestController.setDebug<core::Processor>();
std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository");
std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
static const std::string TEST_CONFIG_YAML = R"(
Flow Controller:
name: Simple
Processors:
- name: XYZ
class: GetFile
Properties:
Input Directory: "/"
Batch Size: 1
)";
std::istringstream configYamlStream(TEST_CONFIG_YAML);
std::unique_ptr<core::ProcessGroup> rootFlowConfig = yamlConfig.getYamlRoot(configYamlStream);
REQUIRE(rootFlowConfig);
REQUIRE(rootFlowConfig->findProcessorByName("XYZ"));
utils::Identifier uuid = rootFlowConfig->findProcessorByName("XYZ")->getUUID();
REQUIRE(uuid);
REQUIRE(!rootFlowConfig->findProcessorByName("XYZ")->getUUIDStr().empty());
}
class DummyComponent : public core::ConfigurableComponent {
public:
bool supportsDynamicProperties() const override {
return false;
}
bool supportsDynamicRelationships() const override {
return false;
}
bool canEdit() override {
return true;
}
};
TEST_CASE("Test Dependent Property", "[YamlConfigurationDependentProperty]") {
TestController test_controller;
LogTestController &logTestController = LogTestController::getInstance();
logTestController.setDebug<TestPlan>();
logTestController.setDebug<core::YamlConfiguration>();
std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository");
std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
const auto component = std::make_shared<DummyComponent>();
component->setSupportedProperties(std::array{
core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
core::Property("Prop B", "Prop B desc", "val B", true, "", { "Prop A" }, { })
});
yamlConfig.validateComponentProperties(*component, "component A", "section A");
REQUIRE(true); // Expected to get here w/o any exceptions
}
TEST_CASE("Test Dependent Property 2", "[YamlConfigurationDependentProperty2]") {
TestController test_controller;
LogTestController &logTestController = LogTestController::getInstance();
logTestController.setDebug<TestPlan>();
logTestController.setDebug<core::YamlConfiguration>();
std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository");
std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
const auto component = std::make_shared<DummyComponent>();
component->setSupportedProperties(std::array{
core::Property("Prop A", "Prop A desc", "", false, "", { }, { }),
core::Property("Prop B", "Prop B desc", "val B", true, "", { "Prop A" }, { })
});
bool config_failed = false;
try {
yamlConfig.validateComponentProperties(*component, "component A", "section A");
} catch (const std::exception &e) {
config_failed = true;
REQUIRE("Unable to parse configuration file for component named 'component A' because property "
"'Prop B' depends on property 'Prop A' which is not set "
"[in 'section A' section of configuration file]" == std::string(e.what()));
}
REQUIRE(config_failed);
}
#ifdef YAML_CONFIGURATION_USE_REGEX
TEST_CASE("Test Exclusive Property", "[YamlConfigurationExclusiveProperty]") {
TestController test_controller;
LogTestController &logTestController = LogTestController::getInstance();
logTestController.setDebug<TestPlan>();
logTestController.setDebug<core::YamlConfiguration>();
std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository");
std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
const auto component = std::make_shared<DummyComponent>();
component->setSupportedProperties(std::array{
core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
core::Property("Prop B", "Prop B desc", "val B", true, "", { }, { { "Prop A", "^abcd.*$" } })
});
yamlConfig.validateComponentProperties(*component, "component A", "section A");
REQUIRE(true); // Expected to get here w/o any exceptions
}
TEST_CASE("Test Regex Property", "[YamlConfigurationRegexProperty]") {
TestController test_controller;
LogTestController &logTestController = LogTestController::getInstance();
logTestController.setDebug<TestPlan>();
logTestController.setDebug<core::YamlConfiguration>();
std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository");
std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
const auto component = std::make_shared<DummyComponent>();
component->setSupportedProperties(std::array{
core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
core::Property("Prop B", "Prop B desc", "val B", true, "^val.*$", { }, { })
});
yamlConfig.validateComponentProperties(*component, "component A", "section A");
REQUIRE(true); // Expected to get here w/o any exceptions
}
TEST_CASE("Test Exclusive Property 2", "[YamlConfigurationExclusiveProperty2]") {
TestController test_controller;
LogTestController &logTestController = LogTestController::getInstance();
logTestController.setDebug<TestPlan>();
logTestController.setDebug<core::YamlConfiguration>();
std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository");
std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
const auto component = std::make_shared<DummyComponent>();
component->setSupportedProperties(std::array{
core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
core::Property("Prop B", "Prop B desc", "val B", true, "", { }, { { "Prop A", "^val.*$" } })
});
bool config_failed = false;
try {
yamlConfig.validateComponentProperties(*component, "component A", "section A");
} catch (const std::exception &e) {
config_failed = true;
REQUIRE("Unable to parse configuration file for component named 'component A' because "
"property 'Prop B' must not be set when the value of property 'Prop A' matches '^val.*$' "
"[in 'section A' section of configuration file]" == std::string(e.what()));
}
REQUIRE(config_failed);
}
TEST_CASE("Test Regex Property 2", "[YamlConfigurationRegexProperty2]") {
TestController test_controller;
LogTestController &logTestController = LogTestController::getInstance();
logTestController.setDebug<TestPlan>();
logTestController.setDebug<core::YamlConfiguration>();
std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository");
std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
const auto component = std::make_shared<DummyComponent>();
component->setSupportedProperties(std::array{
core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }),
core::Property("Prop B", "Prop B desc", "val B", true, "^notval.*$", { }, { })
});
bool config_failed = false;
try {
yamlConfig.validateComponentProperties(*component, "component A", "section A");
} catch (const std::exception &e) {
config_failed = true;
REQUIRE("Unable to parse configuration file for component named 'component A' because "
"property 'Prop B' does not match validation pattern '^notval.*$' "
"[in 'section A' section of configuration file]" == std::string(e.what()));
}
REQUIRE(config_failed);
}
#endif // YAML_CONFIGURATION_USE_REGEX
TEST_CASE("Test YAML Config With Funnel", "[YamlConfiguration]") {
TestController test_controller;
std::shared_ptr<core::Repository> testProvRepo = core::createRepository("provenancerepository");
std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
core::YamlConfiguration yamlConfig(testProvRepo, testFlowFileRepo, content_repo, streamFactory, configuration);
static const std::string CONFIG_YAML_WITH_FUNNEL =
R"(
MiNiFi Config Version: 3
Flow Controller:
name: root
comment: ''
Processors:
- id: 0eac51eb-d76c-4ba6-9f0c-351795b2d243
name: GenerateFlowFile1
class: org.apache.nifi.minifi.processors.GenerateFlowFile
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
scheduling period: 10000 ms
Properties:
Batch Size: '1'
Custom Text: custom1
Data Format: Binary
File Size: 1 kB
Unique FlowFiles: 'true'
- id: 5ec49d9b-673d-4c6f-9108-ab4acce0c1dc
name: GenerateFlowFile2
class: org.apache.nifi.minifi.processors.GenerateFlowFile
max concurrent tasks: 1
scheduling strategy: TIMER_DRIVEN
scheduling period: 10000 ms
Properties:
Batch Size: '1'
Custom Text: other2
Data Format: Binary
File Size: 1 kB
Unique FlowFiles: 'true'
- id: 695658ba-5b6e-4c7d-9c95-5a980b622c1f
name: LogAttribute
class: org.apache.nifi.minifi.processors.LogAttribute
max concurrent tasks: 1
scheduling strategy: EVENT_DRIVEN
auto-terminated relationships list:
- success
Properties:
FlowFiles To Log: '0'
Funnels:
- id: 01a2f910-7050-41c1-8528-942764e7591d
Connections:
- id: 97c6bdfb-3909-499f-9ae5-011cbe8cadaf
name: 01a2f910-7050-41c1-8528-942764e7591d//LogAttribute
source id: 01a2f910-7050-41c1-8528-942764e7591d
source relationship names: []
destination id: 695658ba-5b6e-4c7d-9c95-5a980b622c1f
- id: 353e6bd5-5fca-494f-ae99-02572352c47a
name: GenerateFlowFile2/success/01a2f910-7050-41c1-8528-942764e7591d
source id: 5ec49d9b-673d-4c6f-9108-ab4acce0c1dc
source relationship names:
- success
destination id: 01a2f910-7050-41c1-8528-942764e7591d
- id: 9c02c302-eb4f-4aac-98ed-0f6720a4ff1b
name: GenerateFlowFile1/success/01a2f910-7050-41c1-8528-942764e7591d
source id: 0eac51eb-d76c-4ba6-9f0c-351795b2d243
source relationship names:
- success
destination id: 01a2f910-7050-41c1-8528-942764e7591d
Remote Process Groups: []
)";
std::istringstream configYamlStream(CONFIG_YAML_WITH_FUNNEL);
std::unique_ptr<core::ProcessGroup> rootFlowConfig = yamlConfig.getYamlRoot(configYamlStream);
REQUIRE(rootFlowConfig);
REQUIRE(rootFlowConfig->findProcessorByName("GenerateFlowFile1"));
REQUIRE(rootFlowConfig->findProcessorByName("GenerateFlowFile2"));
REQUIRE(rootFlowConfig->findProcessorById(utils::Identifier::parse("01a2f910-7050-41c1-8528-942764e7591d").value()));
std::map<std::string, minifi::Connection*> connectionMap;
rootFlowConfig->getConnections(connectionMap);
REQUIRE(6 == connectionMap.size());
for (auto it : connectionMap) {
REQUIRE(it.second);
REQUIRE(!it.second->getUUIDStr().empty());
REQUIRE(it.second->getDestination());
REQUIRE(it.second->getSource());
}
}
TEST_CASE("Test UUID duplication checks", "[YamlConfiguration]") {
TestController test_controller;
std::shared_ptr<core::Repository> test_prov_repo = core::createRepository("provenancerepository");
std::shared_ptr<core::Repository> test_flow_file_repo = core::createRepository("flowfilerepository");
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
core::YamlConfiguration yaml_config(test_prov_repo, test_flow_file_repo, content_repo, stream_factory, configuration);
for (char i = '1'; i <= '8'; ++i) {
DYNAMIC_SECTION("Changing UUID 00000000-0000-0000-0000-00000000000" << i << " to be a duplicate") {
std::string config_yaml =
R"(
Flow Controller:
name: root
comment: ''
Processors:
- id: 00000000-0000-0000-0000-000000000001
name: GenerateFlowFile1
class: org.apache.nifi.minifi.processors.GenerateFlowFile
- id: 00000000-0000-0000-0000-000000000002
name: LogAttribute
class: org.apache.nifi.minifi.processors.LogAttribute
Funnels:
- id: 00000000-0000-0000-0000-000000000003
- id: 99999999-9999-9999-9999-999999999999
Connections:
- id: 00000000-0000-0000-0000-000000000004
name: 00000000-0000-0000-0000-000000000003//LogAttribute
source id: 00000000-0000-0000-0000-000000000003
source relationship names: []
destination id: 00000000-0000-0000-0000-000000000002
- id: 00000000-0000-0000-0000-000000000005
name: GenerateFlowFile1/success/00000000-0000-0000-0000-000000000003
source id: 00000000-0000-0000-0000-000000000001
source relationship names:
- success
destination id: 00000000-0000-0000-0000-000000000003
Remote Process Groups:
- id: 00000000-0000-0000-0000-000000000006
name: ''
url: http://localhost:8080/nifi
transport protocol: RAW
Input Ports:
- id: 00000000-0000-0000-0000-000000000007
name: test2
max concurrent tasks: 1
use compression: false
Output Ports: []
Controller Services:
- name: SSLContextService
id: 00000000-0000-0000-0000-000000000008
class: SSLContextService
)";
auto config_old = config_yaml;
utils::StringUtils::replaceAll(config_yaml, std::string("00000000-0000-0000-0000-00000000000") + i, "99999999-9999-9999-9999-999999999999");
std::istringstream config_yaml_stream(config_yaml);
REQUIRE_THROWS_WITH(yaml_config.getYamlRoot(config_yaml_stream), "General Operation: UUID 99999999-9999-9999-9999-999999999999 is duplicated in the flow configuration");
}
}
}