blob: 09da30ae0df8b5ac9c679cec71814ec0aa22944a [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "unit/TestBase.h"
#include "unit/Catch.h"
#include "OpcUaTestServer.h"
#include "unit/SingleProcessorTestController.h"
#include "include/fetchopc.h"
#include "unit/TestUtils.h"
namespace org::apache::nifi::minifi::test {
TEST_CASE("Test fetching using path node id", "[fetchopcprocessor]") {
OpcUaTestServer server(4841);
server.start();
SingleProcessorTestController controller{minifi::test::utils::make_processor<processors::FetchOPCProcessor>("FetchOPCProcessor")};
auto fetch_opc_processor = controller.getProcessor();
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint.name, "opc.tcp://127.0.0.1:4841/"));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeIDType.name, "Path"));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeID.name, "Simulator/Default/Device1"));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NameSpaceIndex.name, std::to_string(server.getNamespaceIndex())));
const auto results = controller.trigger();
REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty());
REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 4);
for (size_t i = 0; i < 3; i++) {
auto flow_file = results.at(processors::FetchOPCProcessor::Success)[i];
CHECK(flow_file->getAttribute("Browsename") == "INT" + std::to_string(i + 1));
CHECK(flow_file->getAttribute("Datasize") == "4");
CHECK(flow_file->getAttribute("Full path") == "Simulator/Default/Device1/INT" + std::to_string(i + 1));
CHECK(flow_file->getAttribute("NodeID"));
CHECK(flow_file->getAttribute("NodeID type") == "numeric");
CHECK(flow_file->getAttribute("Typename") == "Int32");
CHECK(flow_file->getAttribute("Sourcetimestamp"));
CHECK(controller.plan->getContent(flow_file) == std::to_string(i + 1));
}
auto flow_file = results.at(processors::FetchOPCProcessor::Success)[3];
CHECK(flow_file->getAttribute("Browsename") == "INT4");
CHECK(flow_file->getAttribute("Datasize") == "4");
CHECK(flow_file->getAttribute("Full path") == "Simulator/Default/Device1/INT3/INT4");
CHECK(flow_file->getAttribute("NodeID"));
CHECK(flow_file->getAttribute("NodeID type") == "numeric");
CHECK(flow_file->getAttribute("Typename") == "Int32");
CHECK(flow_file->getAttribute("Sourcetimestamp"));
CHECK(controller.plan->getContent(flow_file) == "4");
}
TEST_CASE("Test fetching using custom reference type id path", "[fetchopcprocessor]") {
OpcUaTestServer server(4841);
server.start();
SingleProcessorTestController controller{minifi::test::utils::make_processor<processors::FetchOPCProcessor>("FetchOPCProcessor")};
auto fetch_opc_processor = controller.getProcessor();
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint.name, "opc.tcp://127.0.0.1:4841/"));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeIDType.name, "Path"));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeID.name, "Simulator/Default/Device1/INT3"));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NameSpaceIndex.name, std::to_string(server.getNamespaceIndex())));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::PathReferenceTypes.name, "Organizes/Organizes/HasComponent"));
const auto results = controller.trigger();
REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty());
REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 2);
auto flow_file = results.at(processors::FetchOPCProcessor::Success)[0];
CHECK(flow_file->getAttribute("Browsename") == "INT3");
CHECK(flow_file->getAttribute("Datasize") == "4");
CHECK(flow_file->getAttribute("Full path") == "Simulator/Default/Device1/INT3");
CHECK(flow_file->getAttribute("NodeID"));
CHECK(flow_file->getAttribute("NodeID type") == "numeric");
CHECK(flow_file->getAttribute("Typename") == "Int32");
CHECK(flow_file->getAttribute("Sourcetimestamp"));
CHECK(controller.plan->getContent(flow_file) == "3");
flow_file = results.at(processors::FetchOPCProcessor::Success)[1];
CHECK(flow_file->getAttribute("Browsename") == "INT4");
CHECK(flow_file->getAttribute("Datasize") == "4");
CHECK(flow_file->getAttribute("Full path") == "Simulator/Default/Device1/INT3/INT4");
CHECK(flow_file->getAttribute("NodeID"));
CHECK(flow_file->getAttribute("NodeID type") == "numeric");
CHECK(flow_file->getAttribute("Typename") == "Int32");
CHECK(flow_file->getAttribute("Sourcetimestamp"));
CHECK(controller.plan->getContent(flow_file) == "4");
}
TEST_CASE("Test missing path reference types", "[fetchopcprocessor]") {
SingleProcessorTestController controller{minifi::test::utils::make_processor<processors::FetchOPCProcessor>("FetchOPCProcessor")};
auto fetch_opc_processor = controller.getProcessor();
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint.name, "opc.tcp://127.0.0.1:4841/"));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeIDType.name, "Path"));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeID.name, "Simulator/Default/Device1/INT3"));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::PathReferenceTypes.name, "Organizes/Organizes"));
REQUIRE_THROWS_WITH(controller.trigger(), "Process Schedule Operation: Path reference types must be provided for each node pair in the path!");
}
TEST_CASE("Test username and password should both be provided", "[fetchopcprocessor]") {
SingleProcessorTestController controller{minifi::test::utils::make_processor<processors::FetchOPCProcessor>("FetchOPCProcessor")};
auto put_opc_processor = controller.getProcessor();
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint.name, "opc.tcp://127.0.0.1:4841/"));
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::Username.name, "user"));
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::Password.name, ""));
REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: Both or neither of Username and Password should be provided!");
}
TEST_CASE("Test certificate path and key path should both be provided", "[fetchopcprocessor]") {
SingleProcessorTestController controller{minifi::test::utils::make_processor<processors::FetchOPCProcessor>("FetchOPCProcessor")};
auto put_opc_processor = controller.getProcessor();
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint.name, "opc.tcp://127.0.0.1:4841/"));
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::CertificatePath.name, "cert"));
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::KeyPath.name, ""));
REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: All or none of Certificate path and Key path should be provided!");
}
TEST_CASE("Test application uri should be provided if certificate is provided", "[fetchopcprocessor]") {
SingleProcessorTestController controller{minifi::test::utils::make_processor<processors::FetchOPCProcessor>("FetchOPCProcessor")};
auto put_opc_processor = controller.getProcessor();
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint.name, "opc.tcp://127.0.0.1:4841/"));
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::CertificatePath.name, "cert"));
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::KeyPath.name, "key"));
REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: Application URI must be provided if Certificate path is provided!");
}
TEST_CASE("Test certificate path must be valid", "[fetchopcprocessor]") {
SingleProcessorTestController controller{minifi::test::utils::make_processor<processors::FetchOPCProcessor>("FetchOPCProcessor")};
auto put_opc_processor = controller.getProcessor();
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint.name, "opc.tcp://127.0.0.1:4841/"));
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::CertificatePath.name, "/invalid/cert/path"));
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::KeyPath.name, "key"));
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::ApplicationURI.name, "appuri"));
REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: Failed to load cert from path: /invalid/cert/path");
}
TEST_CASE("Test key path must be valid", "[fetchopcprocessor]") {
SingleProcessorTestController controller{minifi::test::utils::make_processor<processors::FetchOPCProcessor>("FetchOPCProcessor")};
auto put_opc_processor = controller.getProcessor();
auto test_cert_path = controller.createTempDirectory() / "test_cert.pem";
{
std::ofstream cert_file(test_cert_path);
cert_file << "test";
cert_file.close();
}
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint.name, "opc.tcp://127.0.0.1:4841/"));
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::CertificatePath.name, test_cert_path.string()));
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::KeyPath.name, "/invalid/key"));
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::ApplicationURI.name, "appuri"));
REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: Failed to load key from path: /invalid/key");
}
TEST_CASE("Test trusted certs path must be valid", "[fetchopcprocessor]") {
SingleProcessorTestController controller{minifi::test::utils::make_processor<processors::FetchOPCProcessor>("FetchOPCProcessor")};
auto put_opc_processor = controller.getProcessor();
auto test_cert_path = controller.createTempDirectory() / "test_cert.pem";
{
std::ofstream cert_file(test_cert_path);
cert_file << "test";
cert_file.close();
}
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint.name, "opc.tcp://127.0.0.1:4841/"));
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::CertificatePath.name, test_cert_path.string()));
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::KeyPath.name, test_cert_path.string()));
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::TrustedPath.name, "/invalid/trusted"));
REQUIRE(put_opc_processor->setProperty(processors::FetchOPCProcessor::ApplicationURI.name, "appuri"));
REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: Failed to load trusted server certs from path: /invalid/trusted");
}
TEST_CASE("Test no fetch result using lazy mode when no timestamps are changed", "[fetchopcprocessor]") {
OpcUaTestServer server(4841);
server.start();
SingleProcessorTestController controller{minifi::test::utils::make_processor<processors::FetchOPCProcessor>("FetchOPCProcessor")};
LogTestController::getInstance().setDebug<processors::FetchOPCProcessor>();
auto fetch_opc_processor = controller.getProcessor();
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint.name, "opc.tcp://127.0.0.1:4841/"));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeIDType.name, "Path"));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeID.name, "Simulator/Default/Device1"));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NameSpaceIndex.name, std::to_string(server.getNamespaceIndex())));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::Lazy.name, "On"));
auto results = controller.trigger();
REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty());
REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 4);
results = controller.trigger();
REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty());
REQUIRE(results.at(processors::FetchOPCProcessor::Success).empty());
REQUIRE(LogTestController::getInstance().contains("Node Simulator/Default/Device1/INT3 has no new source timestamp, skipping"));
}
TEST_CASE("Test fetch for nodes with changed timestamps with lazy mode", "[fetchopcprocessor]") {
OpcUaTestServer server(4841);
server.start();
SingleProcessorTestController controller{minifi::test::utils::make_processor<processors::FetchOPCProcessor>("FetchOPCProcessor")};
LogTestController::getInstance().setDebug<processors::FetchOPCProcessor>();
auto fetch_opc_processor = controller.getProcessor();
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint.name, "opc.tcp://127.0.0.1:4841/"));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeIDType.name, "Path"));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeID.name, "Simulator/Default/Device1"));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NameSpaceIndex.name, std::to_string(server.getNamespaceIndex())));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::Lazy.name, "On"));
auto results = controller.trigger();
REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty());
REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 4);
server.updateNodeTimestamp("Simulator/Default/Device1/INT3");
results = controller.trigger();
REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty());
REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 1);
auto flow_file = results.at(processors::FetchOPCProcessor::Success)[0];
CHECK(flow_file->getAttribute("Browsename") == "INT3");
}
TEST_CASE("Test no fetch result using lazy new value mode when no values are changed", "[fetchopcprocessor]") {
OpcUaTestServer server(4841);
server.start();
SingleProcessorTestController controller{minifi::test::utils::make_processor<processors::FetchOPCProcessor>("FetchOPCProcessor")};
LogTestController::getInstance().setDebug<processors::FetchOPCProcessor>();
auto fetch_opc_processor = controller.getProcessor();
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint.name, "opc.tcp://127.0.0.1:4841/"));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeIDType.name, "Path"));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeID.name, "Simulator/Default/Device1"));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NameSpaceIndex.name, std::to_string(server.getNamespaceIndex())));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::Lazy.name, "New Value"));
auto results = controller.trigger();
REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty());
REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 4);
server.updateNodeTimestamp("Simulator/Default/Device1/INT3");
results = controller.trigger();
REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty());
REQUIRE(results.at(processors::FetchOPCProcessor::Success).empty());
REQUIRE(LogTestController::getInstance().contains("Node Simulator/Default/Device1/INT3 has no new value, skipping"));
}
TEST_CASE("Test fetching new values using lazy new value mode", "[fetchopcprocessor]") {
OpcUaTestServer server(4841);
server.start();
SingleProcessorTestController controller{minifi::test::utils::make_processor<processors::FetchOPCProcessor>("FetchOPCProcessor")};
LogTestController::getInstance().setDebug<processors::FetchOPCProcessor>();
auto fetch_opc_processor = controller.getProcessor();
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint.name, "opc.tcp://127.0.0.1:4841/"));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeIDType.name, "Path"));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeID.name, "Simulator/Default/Device1"));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NameSpaceIndex.name, std::to_string(server.getNamespaceIndex())));
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::Lazy.name, "New Value"));
auto results = controller.trigger();
REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty());
REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 4);
server.updateNodeTimestamp("Simulator/Default/Device1/INT3");
server.updateNodeValue("Simulator/Default/Device1/INT2", 42);
results = controller.trigger();
REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty());
REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 1);
auto flow_file = results.at(processors::FetchOPCProcessor::Success)[0];
CHECK(flow_file->getAttribute("Browsename") == "INT2");
}
} // namespace org::apache::nifi::minifi::test