blob: 8b02e2df19ae70e37ce701c49b56ade5898a5011 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "unit/Catch.h"
#include "catch2/matchers/catch_matchers_string.hpp"
#include "unit/TestBase.h"
#include "../processors/ConsumeMQTT.h"
#include "core/Resource.h"
#include "unit/SingleProcessorTestController.h"
#include "rapidjson/document.h"
#include "unit/ProcessorUtils.h"
namespace org::apache::nifi::minifi::test {
void verifyXmlJsonResult(const std::string& json_content, size_t expected_record_count, bool add_attributes_as_fields) {
rapidjson::Document document;
document.Parse(json_content.c_str());
REQUIRE(document.IsArray());
REQUIRE(document.GetArray().Size() == expected_record_count);
for (size_t i = 0; i < expected_record_count; ++i) {
auto& current_record = document[gsl::narrow<rapidjson::SizeType>(i)];
REQUIRE(current_record.IsObject());
REQUIRE(current_record.HasMember("int_value"));
uint64_t int_result = current_record["int_value"].GetInt64();
CHECK(int_result == 42);
REQUIRE(current_record.HasMember("string_value"));
std::string string_result = current_record["string_value"].GetString();
CHECK(string_result == "test");
if (add_attributes_as_fields) {
string_result = current_record["_topic"].GetString();
CHECK(string_result == "mytopic/segment/" + std::to_string(i));
auto array = current_record["_topicSegments"].GetArray();
CHECK(array.Size() == 3);
string_result = array[0].GetString();
CHECK(string_result == "mytopic");
string_result = array[1].GetString();
CHECK(string_result == "segment");
string_result = array[2].GetString();
CHECK(string_result == std::to_string(i));
int_result = current_record["_qos"].GetInt64();
CHECK(int_result == i);
bool bool_result = current_record["_isDuplicate"].GetBool();
if (i == 0) {
CHECK_FALSE(bool_result);
} else {
CHECK(bool_result);
}
bool_result = current_record["_isRetained"].GetBool();
if (i == 0) {
CHECK_FALSE(bool_result);
} else {
CHECK(bool_result);
}
} else {
CHECK_FALSE(current_record.HasMember("_topic"));
CHECK_FALSE(current_record.HasMember("_qos"));
CHECK_FALSE(current_record.HasMember("_isDuplicate"));
CHECK_FALSE(current_record.HasMember("_isRetained"));
}
}
}
class TestConsumeMQTTProcessor : public minifi::processors::ConsumeMQTT {
public:
using SmartMessage = processors::AbstractMQTTProcessor::SmartMessage;
using MQTTMessageDeleter = processors::AbstractMQTTProcessor::MQTTMessageDeleter;
explicit TestConsumeMQTTProcessor(minifi::core::ProcessorMetadata metadata)
: minifi::processors::ConsumeMQTT(std::move(metadata)) {}
using ConsumeMQTT::enqueueReceivedMQTTMsg;
void initializeClient() override {
}
void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override {
minifi::processors::ConsumeMQTT::onTriggerImpl(context, session);
}
};
REGISTER_RESOURCE(TestConsumeMQTTProcessor, Processor);
struct ConsumeMqttTestFixture {
ConsumeMqttTestFixture()
: test_controller_(utils::make_processor<TestConsumeMQTTProcessor>("TestConsumeMQTTProcessor")),
consume_mqtt_processor_(test_controller_.getProcessor()) {
REQUIRE(consume_mqtt_processor_ != nullptr);
LogTestController::getInstance().setDebug<TestConsumeMQTTProcessor>();
}
ConsumeMqttTestFixture(ConsumeMqttTestFixture&&) = delete;
ConsumeMqttTestFixture(const ConsumeMqttTestFixture&) = delete;
ConsumeMqttTestFixture& operator=(ConsumeMqttTestFixture&&) = delete;
ConsumeMqttTestFixture& operator=(const ConsumeMqttTestFixture&) = delete;
~ConsumeMqttTestFixture() {
LogTestController::getInstance().reset();
}
SingleProcessorTestController test_controller_;
core::Processor* consume_mqtt_processor_ = nullptr;
};
using namespace std::literals::chrono_literals;
TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_EmptyTopic", "[consumeMQTTTest]") {
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
REQUIRE_THROWS_WITH(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_),
Catch::Matchers::EndsWith("Expected valid value from \"TestConsumeMQTTProcessor::Topic\", but got PropertyNotSet (Property Error:2)"));
}
TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_EmptyBrokerURI", "[consumeMQTTTest]") {
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
REQUIRE_THROWS_WITH(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_),
Catch::Matchers::EndsWith("Expected valid value from \"TestConsumeMQTTProcessor::Broker URI\", but got PropertyNotSet (Property Error:2)"));
}
TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_DurableSessionWithID", "[consumeMQTTTest]") {
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::ClientID.name, "subscriber"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::QoS.name, "1"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::CleanSession.name, "false"));
REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
REQUIRE_FALSE(LogTestController::getInstance().contains("[warning] Messages are not preserved during client disconnection "
"by the broker when QoS is less than 1 for durable (non-clean) sessions. Only subscriptions are preserved.", 0s));
}
TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_DurableSessionWithQoS0", "[consumeMQTTTest]") {
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::ClientID.name, "subscriber"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::QoS.name, "0"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::CleanSession.name, "false"));
REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
REQUIRE(LogTestController::getInstance().contains("[warning] Messages are not preserved during client disconnection "
"by the broker when QoS is less than 1 for durable (non-clean) sessions. Only subscriptions are preserved.", 1s));
}
TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_DurableSessionWithID_V_5", "[consumeMQTTTest]") {
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::ClientID.name, "subscriber"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::QoS.name, "1"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::MqttVersion.name,
std::string{magic_enum::enum_name(minifi::processors::mqtt::MqttVersions::V_5_0)}));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::SessionExpiryInterval.name, "1 h"));
REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
REQUIRE_FALSE(LogTestController::getInstance().contains("[warning] Messages are not preserved during client disconnection "
"by the broker when QoS is less than 1 for durable (Session Expiry Interval > 0) sessions. Only subscriptions are preserved.", 0s));
}
TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_DurableSessionWithQoS0_V_5", "[consumeMQTTTest]") {
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::ClientID.name, "subscriber"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::QoS.name, "0"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::MqttVersion.name,
std::string{magic_enum::enum_name(minifi::processors::mqtt::MqttVersions::V_5_0)}));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::SessionExpiryInterval.name, "1 h"));
REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
REQUIRE(LogTestController::getInstance().contains("[warning] Messages are not preserved during client disconnection "
"by the broker when QoS is less than 1 for durable (Session Expiry Interval > 0) sessions. Only subscriptions are preserved.", 1s));
}
TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_CleanStart_V_3", "[consumeMQTTTest]") {
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::CleanStart.name, "true"));
REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x specification does not support Clean Start. Property is not used.", 1s));
}
TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_SessionExpiryInterval_V_3", "[consumeMQTTTest]") {
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::SessionExpiryInterval.name, "1 h"));
REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x specification does not support Session Expiry Intervals. Property is not used.", 1s));
}
TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_CleanSession_V_5", "[consumeMQTTTest]") {
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::MqttVersion.name,
std::string{magic_enum::enum_name(minifi::processors::mqtt::MqttVersions::V_5_0)}));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::SessionExpiryInterval.name, "0 s"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::CleanSession.name, "true"));
REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 5.0 specification does not support Clean Session. Property is not used.", 1s));
}
TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_TopicAliasMaximum_V_3", "[consumeMQTTTest]") {
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::TopicAliasMaximum.name, "1"));
REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x specification does not support Topic Alias Maximum. Property is not used.", 1s));
}
TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_ReceiveMaximum_V_3", "[consumeMQTTTest]") {
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::ReceiveMaximum.name, "1"));
REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x specification does not support Receive Maximum. Property is not used.", 1s));
}
TEST_CASE_METHOD(ConsumeMqttTestFixture, "Read XML messages and write them to json records", "[consumeMQTTTest]") {
test_controller_.plan->addController("XMLReader", "XMLReader");
test_controller_.plan->addController("JsonRecordSetWriter", "JsonRecordSetWriter");
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::RecordReader.name, "XMLReader"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::RecordWriter.name, "JsonRecordSetWriter"));
bool add_attributes_as_fields = true;
SECTION("Add attributes as fields by default") {
}
SECTION("Do not add attributes as fields") {
add_attributes_as_fields = false;
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::AddAttributesAsFields.name, "false"));
}
const size_t expected_record_count = 2;
const std::string payload = R"(<root><int_value>42</int_value><string_value>test</string_value></root>)";
for (size_t i = 0; i < expected_record_count; ++i) {
TestConsumeMQTTProcessor::SmartMessage message{std::unique_ptr<MQTTAsync_message, TestConsumeMQTTProcessor::MQTTMessageDeleter>(
new MQTTAsync_message{.struct_id = {'M', 'Q', 'T', 'M'}, .struct_version = gsl::narrow<int>(i), .payloadlen = gsl::narrow<int>(payload.size()),
.payload = const_cast<char*>(payload.data()), .qos = gsl::narrow<int>(i), .retained = gsl::narrow<int>(i), .dup = gsl::narrow<int>(i),
.msgid = gsl::narrow<int>(i + 1), .properties = {}}),
std::string{"mytopic/segment/" + std::to_string(i)}}; // NOLINT(clang-analyzer-cplusplus.NewDeleteLeaks)
auto& test_processor = dynamic_cast<TestConsumeMQTTProcessor&>(consume_mqtt_processor_->getImpl());
test_processor.enqueueReceivedMQTTMsg(std::move(message));
}
const auto trigger_results = test_controller_.trigger();
CHECK(trigger_results.at(TestConsumeMQTTProcessor::Success).size() == 1);
const auto flow_file = trigger_results.at(TestConsumeMQTTProcessor::Success).at(0);
auto string_content = test_controller_.plan->getContent(flow_file);
verifyXmlJsonResult(string_content, expected_record_count, add_attributes_as_fields);
CHECK(*flow_file->getAttribute("record.count") == "2");
CHECK(*flow_file->getAttribute("mqtt.broker") == "127.0.0.1:1883");
}
TEST_CASE_METHOD(ConsumeMqttTestFixture, "Invalid XML payload does not result in new flow files", "[consumeMQTTTest]") {
test_controller_.plan->addController("XMLReader", "XMLReader");
test_controller_.plan->addController("JsonRecordSetWriter", "JsonRecordSetWriter");
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::RecordReader.name, "XMLReader"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::RecordWriter.name, "JsonRecordSetWriter"));
const std::string payload = "invalid xml payload";
TestConsumeMQTTProcessor::SmartMessage message{
std::unique_ptr<MQTTAsync_message, TestConsumeMQTTProcessor::MQTTMessageDeleter>(
new MQTTAsync_message{.struct_id = {'M', 'Q', 'T', 'M'}, .struct_version = 1, .payloadlen = gsl::narrow<int>(payload.size()),
.payload = const_cast<char*>(payload.data()), .qos = 1, .retained = 0, .dup = 0, .msgid = 42, .properties = {}}),
std::string{"mytopic"}}; // NOLINT(clang-analyzer-cplusplus.NewDeleteLeaks)
auto& test_processor = dynamic_cast<TestConsumeMQTTProcessor&>(consume_mqtt_processor_->getImpl());
test_processor.enqueueReceivedMQTTMsg(std::move(message));
const auto trigger_results = test_controller_.trigger();
CHECK(trigger_results.at(TestConsumeMQTTProcessor::Success).empty());
REQUIRE(LogTestController::getInstance().contains("[error] Failed to read records from MQTT message", 1s));
}
TEST_CASE_METHOD(ConsumeMqttTestFixture, "Read MQTT message and write it to a flow file", "[consumeMQTTTest]") {
std::vector<std::string> expected_topic_segments;
std::string topic;
SECTION("Single topic segment") {
expected_topic_segments = {"mytopic"};
topic = "mytopic";
}
SECTION("Multiple topic segments") {
expected_topic_segments = {"my", "topic", "segment"};
topic = "my/topic/segment";
}
SECTION("Empty topic segment") {
expected_topic_segments = {"mytopic", "", "segment"};
topic = "mytopic//segment";
}
SECTION("Empty topic segment at the end") {
expected_topic_segments = {"mytopic", ""};
topic = "mytopic/";
}
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
const size_t expected_flow_file_count = 2;
const std::string payload = "test MQTT payload";
for (size_t i = 0; i < expected_flow_file_count; ++i) {
TestConsumeMQTTProcessor::SmartMessage message{std::unique_ptr<MQTTAsync_message, TestConsumeMQTTProcessor::MQTTMessageDeleter>(
new MQTTAsync_message{.struct_id = {'M', 'Q', 'T', 'M'}, .struct_version = 1, .payloadlen = gsl::narrow<int>(payload.size()),
.payload = const_cast<char*>(payload.data()), .qos = 1, .retained = 0, .dup = 0, .msgid = 42, .properties = {}}),
std::string{topic}}; // NOLINT(clang-analyzer-cplusplus.NewDeleteLeaks)
auto& test_processor = dynamic_cast<TestConsumeMQTTProcessor&>(consume_mqtt_processor_->getImpl());
test_processor.enqueueReceivedMQTTMsg(std::move(message));
}
const auto trigger_results = test_controller_.trigger();
CHECK(trigger_results.at(TestConsumeMQTTProcessor::Success).size() == expected_flow_file_count);
for (size_t i = 0; i < expected_flow_file_count; ++i) {
const auto flow_file = trigger_results.at(TestConsumeMQTTProcessor::Success).at(i);
auto string_content = test_controller_.plan->getContent(flow_file);
CHECK(string_content == payload);
CHECK(*flow_file->getAttribute("mqtt.broker") == "127.0.0.1:1883");
CHECK(*flow_file->getAttribute("mqtt.topic") == topic);
for (size_t j = 0; j < expected_topic_segments.size(); ++j) {
CHECK(*flow_file->getAttribute("mqtt.topic.segment." + std::to_string(j)) == expected_topic_segments[j]);
}
CHECK(*flow_file->getAttribute("mqtt.qos") == "1");
CHECK(*flow_file->getAttribute("mqtt.isDuplicate") == "false");
CHECK(*flow_file->getAttribute("mqtt.isRetained") == "false");
}
}
TEST_CASE_METHOD(ConsumeMqttTestFixture, "Test scheduling failure if non-existent recordset reader or writer is set", "[consumeMQTTTest]") {
test_controller_.plan->addController("XMLReader", "XMLReader");
test_controller_.plan->addController("JsonRecordSetWriter", "JsonRecordSetWriter");
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
SECTION("RecordReader is set to invalid controller service") {
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::RecordReader.name, "invalid_reader"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::RecordWriter.name, "JsonRecordSetWriter"));
REQUIRE_THROWS_WITH(test_controller_.trigger(), Catch::Matchers::EndsWith("Controller service 'Record Reader' = 'invalid_reader' not found"));
}
SECTION("RecordWriter is set to invalid controller service") {
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::RecordReader.name, "XMLReader"));
REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::RecordWriter.name, "invalid_writer"));
REQUIRE_THROWS_WITH(test_controller_.trigger(), Catch::Matchers::EndsWith("Controller service 'Record Writer' = 'invalid_writer' not found"));
}
}
} // namespace org::apache::nifi::minifi::test