blob: 6d0b80b310c71eba4bf33905b579a955021bcf7e [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <mutex>
#include "integration/HTTPIntegrationBase.h"
#include "integration/HTTPHandlers.h"
#include "utils/gsl.h"
#include "unit/TestUtils.h"
#include "unit/EmptyFlow.h"
#include "fmt/format.h"
#include "spdlog/sinks/stdout_sinks.h"
#include "spdlog/sinks/ostream_sink.h"
#include "spdlog/sinks/dist_sink.h"
#include "properties/PropertiesFile.h"
#include "ConfigTestAccessor.h"
#include "unit/Catch.h"
namespace org::apache::nifi::minifi::test {
struct PropertyChange {
std::string name;
std::string value;
std::optional<bool> persist;
};
class C2HeartbeatHandler : public ServerAwareHandler {
public:
bool handlePost(CivetServer* /*server*/, struct mg_connection *conn) override {
std::lock_guard<std::mutex> lock(mutex_);
if (response_) {
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
"text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
response_->length());
mg_printf(conn, "%s", response_->c_str());
} else {
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
"text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
}
return true;
}
void setProperties(const std::vector<PropertyChange>& changes) {
std::vector<std::string> fields;
for (const auto& change : changes) {
if (change.persist.has_value()) {
fields.push_back(fmt::format(R"("{}": {{"value": "{}", "persist": {}}})", change.name, change.value, change.persist.value()));
} else {
fields.push_back(fmt::format(R"("{}": "{}")", change.name, change.value));
}
}
std::lock_guard<std::mutex> lock(mutex_);
response_ =
R"({
"operation" : "heartbeat",
"requested_operations": [{
"operation" : "update",
"operationid" : "79",
"name": "properties",
"args": {)" +
minifi::utils::string::join(", ", fields)
+ R"(}
}]
})";
}
private:
std::mutex mutex_;
std::optional<std::string> response_;
};
class VerifyPropertyUpdate : public HTTPIntegrationBase {
public:
VerifyPropertyUpdate() :fn_{[]{}} {}
explicit VerifyPropertyUpdate(std::function<void()> fn) : fn_(std::move(fn)) {}
VerifyPropertyUpdate(const VerifyPropertyUpdate&) = delete;
VerifyPropertyUpdate(VerifyPropertyUpdate&&) = default;
VerifyPropertyUpdate& operator=(const VerifyPropertyUpdate&) = delete;
VerifyPropertyUpdate& operator=(VerifyPropertyUpdate&&) = default;
~VerifyPropertyUpdate() override = default;
void testSetup() override {}
void runAssertions() override {
fn_();
}
std::function<void()> fn_;
[[nodiscard]] int getRestartRequestedCount() const noexcept { return restart_requested_count_; }
};
static const std::string properties_file =
"nifi.property.one=tree\n"
"nifi.c2.enable=true\n"
"nifi.c2.agent.class=test\n"
"nifi.c2.agent.heartbeat.period=500\n";
static const std::string log_properties_file =
"logger.root=INFO,ostream\n";
using namespace std::literals::chrono_literals;
struct DummyClass1 {};
struct DummyClass2 {};
namespace dummy {
struct DummyClass3 {};
} // namespace dummy
TEST_CASE("C2PropertiesUpdateTests", "[c2test]") {
TempDirectory tmp_dir;
std::filesystem::path home_dir = tmp_dir.getPath();
minifi::utils::file::PathUtils::create_dir(home_dir / "conf");
std::ofstream{home_dir / "conf/minifi.properties"} << properties_file;
std::ofstream{home_dir / "conf/minifi-log.properties"} << log_properties_file;
std::ofstream{home_dir / "conf/config.yml"} << empty_flow;
C2HeartbeatHandler hb_handler{};
C2AcknowledgeHandler ack_handler{};
auto logger_properties = std::make_shared<core::logging::LoggerProperties>();
// this sets the ostream logger
auto log_test_controller = LogTestController::getInstance(logger_properties);
logger_properties->setHome(home_dir);
logger_properties->loadConfigureFile("conf/minifi-log.properties");
core::logging::LoggerConfiguration::getConfiguration().initialize(logger_properties);
auto logger1 = core::logging::LoggerFactory<DummyClass1>::getLogger();
auto logger2 = core::logging::LoggerFactory<DummyClass2>::getLogger();
auto logger3 = core::logging::LoggerFactory<dummy::DummyClass3>::getLogger();
{
// verify initial log levels, none of these should be logged
logger1->log_debug("DummyClass1::before");
logger2->log_debug("DummyClass2::before");
logger3->log_debug("DummyClass3::before");
REQUIRE(!log_test_controller->contains("DummyClass1::before", 0s));
REQUIRE(!log_test_controller->contains("DummyClass2::before", 0s));
REQUIRE(!log_test_controller->contains("DummyClass3::before", 0s));
}
// On msvc, the passed lambda can't capture a reference to the object under construction, so we need to late-init harness.
VerifyPropertyUpdate harness;
harness = VerifyPropertyUpdate([&] {
REQUIRE(utils::verifyEventHappenedInPollTime(10s, [&] {return ack_handler.isAcknowledged("79");}));
REQUIRE(utils::verifyEventHappenedInPollTime(10s, [&] {
return ack_handler.getApplyCount("FULLY_APPLIED") == 1;
}));
// Updating the same property will result in a no operation response
REQUIRE(utils::verifyEventHappenedInPollTime(10s, [&] {
return ack_handler.getApplyCount("NO_OPERATION") > 0;
}));
// Change the update response to 1 invalid and 1 valid value update
hb_handler.setProperties({{minifi::Configuration::nifi_c2_rest_heartbeat_minimize_updates, "banana", true}, {minifi::Configuration::minifi_disk_space_watchdog_enable, "true", true}});
// Due to 1 invalid value the result will be partially applied
REQUIRE(utils::verifyEventHappenedInPollTime(10s, [&] {
return ack_handler.getApplyCount("PARTIALLY_APPLIED") == 1;
}));
// Repeating the previous update request results in 1 no operation and 1 failure which results in not applied response
REQUIRE(utils::verifyEventHappenedInPollTime(10s, [&] {
return ack_handler.getApplyCount("NOT_APPLIED") > 0
&& harness.getRestartRequestedCount() == 2;
}));
// update operation acknowledged
{
// verify final log levels
logger1->log_debug("DummyClass1::after");
logger2->log_debug("DummyClass2::after"); // this should still not log
logger3->log_debug("DummyClass3::after");
}
REQUIRE(log_test_controller->contains("DummyClass1::after", 0s));
REQUIRE(!log_test_controller->contains("DummyClass2::after", 0s));
REQUIRE(log_test_controller->contains("DummyClass3::after", 0s));
{
minifi::PropertiesFile minifi_properties(std::ifstream{home_dir / "conf/minifi.properties"});
REQUIRE(!minifi_properties.hasValue("nifi.dummy.property"));
REQUIRE(minifi_properties.getValue("nifi.property.one") == "bush");
REQUIRE(minifi_properties.getValue("nifi.property.two") == "ring");
REQUIRE(!minifi_properties.hasValue(minifi::Configuration::nifi_c2_rest_heartbeat_minimize_updates));
REQUIRE(minifi_properties.getValue(minifi::Configuration::minifi_disk_space_watchdog_enable) == "true");
}
{
minifi::PropertiesFile minifi_log_properties(std::ifstream{home_dir / "conf/minifi-log.properties"});
REQUIRE(!minifi_log_properties.hasValue("logger.org::apache::nifi::minifi::test::dummy"));
REQUIRE(minifi_log_properties.getValue("logger.org::apache::nifi::minifi::test::DummyClass1") == "DEBUG,ostream");
}
});
harness.getConfiguration()->setHome(home_dir.string());
harness.getConfiguration()->loadConfigureFile("conf/minifi.properties");
ConfigTestAccessor::call_setLoggerProperties(harness.getConfiguration(), logger_properties);
harness.setUrl("http://localhost:0/heartbeat", &hb_handler);
harness.setUrl("http://localhost:0/acknowledge", &ack_handler);
harness.setC2Url("/heartbeat", "/acknowledge");
hb_handler.setProperties({
{"nifi.dummy.property", "banana", false},
{"nifi.property.one", "bush", std::nullopt}, // default persist = true
{"nifi.property.two", "ring", true},
{"nifi.log.logger.org::apache::nifi::minifi::test::dummy", "DEBUG,ostream", false},
{"nifi.log.logger.org::apache::nifi::minifi::test::DummyClass1", "DEBUG,ostream", true}
});
harness.run((home_dir / "conf/config.yml").string());
}
} // namespace org::apache::nifi::minifi::test