blob: bd0149ca09233963bcb2c944170f00eeabb00f5b [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vector>
#include <memory>
#include <utility>
#include <string>
#include <filesystem>
#include <fstream>
#include "range/v3/algorithm/find.hpp"
#include "unit/TestBase.h"
#include "unit/Catch.h"
#include "core/Processor.h"
#include "Controller.h"
#include "c2/ControllerSocketProtocol.h"
#include "unit/TestUtils.h"
#include "c2/ControllerSocketMetricsPublisher.h"
#include "controllers/SSLContextService.h"
#include "utils/StringUtils.h"
#include "state/UpdateController.h"
#include "core/state/nodes/ResponseNodeLoader.h"
using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::test {
class TestStateController : public minifi::state::StateController {
public:
TestStateController()
: is_running(false) {
}
std::string getComponentName() const override {
return "TestStateController";
}
minifi::utils::Identifier getComponentUUID() const override {
static auto dummyUUID = minifi::utils::Identifier::parse("12345678-1234-1234-1234-123456789abc").value();
return dummyUUID;
}
int16_t start() override {
is_running = true;
return 0;
}
int16_t stop() override {
is_running = false;
return 0;
}
bool isRunning() const override {
return is_running;
}
int16_t pause() override {
return 0;
}
int16_t resume() override {
return 0;
}
std::atomic<bool> is_running;
};
class TestBackTrace : public BackTrace {
public:
using BackTrace::BackTrace;
void addTraceLines(uint32_t line_count) {
for (uint32_t i = 1; i <= line_count; ++i) {
addLine("bt line " + std::to_string(i) + " for " + getName());
}
}
};
class TestUpdateSink : public minifi::state::StateMonitor {
public:
explicit TestUpdateSink(std::shared_ptr<StateController> controller)
: is_running(true),
clear_calls(0),
controller(std::move(controller)),
update_calls(0) {
}
void executeOnComponent(const std::string&, std::function<void(minifi::state::StateController&)> func) override {
func(*controller);
}
void executeOnAllComponents(std::function<void(minifi::state::StateController&)> func) override {
func(*controller);
}
std::string getComponentName() const override {
return "TestUpdateSink";
}
minifi::utils::Identifier getComponentUUID() const override {
static auto dummyUUID = minifi::utils::Identifier::parse("12345678-1234-1234-1234-123456789abc").value();
return dummyUUID;
}
int16_t start() override {
is_running = true;
return 0;
}
int16_t stop() override {
is_running = false;
return 0;
}
bool isRunning() const override {
return is_running;
}
int16_t pause() override {
return 0;
}
int16_t resume() override {
return 0;
}
std::vector<BackTrace> getTraces() override {
std::vector<BackTrace> traces;
TestBackTrace trace1("trace1");
trace1.addTraceLines(2);
traces.push_back(trace1);
TestBackTrace trace2("trace2");
trace2.addTraceLines(3);
traces.push_back(trace2);
return traces;
}
int16_t drainRepositories() override {
return 0;
}
std::map<std::string, std::unique_ptr<minifi::io::InputStream>> getDebugInfo() override {
return {};
}
int16_t clearConnection(const std::string& /*connection*/) override {
clear_calls++;
return 0;
}
std::vector<std::string> getSupportedConfigurationFormats() const override {
return {};
}
nonstd::expected<void, std::string> applyUpdate(const std::string& /*source*/, const std::string& /*configuration*/, bool /*persist*/ = false,
const std::optional<std::string>& /*flow_id*/ = std::nullopt) override {
update_calls++;
return {};
}
uint64_t getUptime() override {
return 8765309;
}
std::atomic<bool> is_running;
std::atomic<uint32_t> clear_calls;
std::shared_ptr<StateController> controller;
std::atomic<uint32_t> update_calls;
};
class TestControllerSocketReporter : public c2::ControllerSocketReporter {
std::unordered_map<std::string, ControllerSocketReporter::QueueSize> getQueueSizes() override {
return {
{"con1", {1, 2}},
{"con2", {3, 3}}
};
}
std::unordered_set<std::string> getFullConnections() override {
return {"con2"};
}
std::unordered_set<std::string> getConnections() override {
return {"con1", "con2"};
}
std::string getAgentManifest() override {
return "testAgentManifest";
}
void setRoot(core::ProcessGroup* /*root*/) override {
}
void setFlowStatusDependencies(core::BulletinStore* /*bulletin_store*/, const std::filesystem::path& /*flowfile_repo_dir*/, const std::filesystem::path& /*flowfile_repo_dir*/) override {
}
std::string getFlowStatus(const std::vector<c2::FlowStatusRequest>& /*requests*/) override {
return "";
}
};
class ControllerTestFixture {
public:
enum class ConnectionType {
UNSECURE,
SSL_FROM_CONFIGURATION
};
ControllerTestFixture()
: configuration_(std::make_shared<minifi::ConfigureImpl>()),
controller_(std::make_shared<TestStateController>()),
update_sink_(std::make_unique<TestUpdateSink>(controller_)) {
configuration_->set(minifi::Configure::controller_socket_host, "localhost");
configuration_->set(minifi::Configure::controller_socket_port, "9997");
configuration_->set(minifi::Configure::nifi_security_client_certificate, (minifi::utils::file::FileUtils::get_executable_dir() / "resources" / "minifi-cpp-flow.crt").string());
configuration_->set(minifi::Configure::nifi_security_client_private_key, (minifi::utils::file::FileUtils::get_executable_dir() / "resources" / "minifi-cpp-flow.key").string());
configuration_->set(minifi::Configure::nifi_security_client_pass_phrase, "abcdefgh");
configuration_->set(minifi::Configure::nifi_security_client_ca_certificate, (minifi::utils::file::FileUtils::get_executable_dir() / "resources" / "root-ca.pem").string());
ssl_context_service_ = std::make_shared<controllers::SSLContextService>("SSLContextService", configuration_);
ssl_context_service_->onEnable();
controller_socket_data_.host = "localhost";
controller_socket_data_.port = 9997;
}
void initalizeControllerSocket(const std::shared_ptr<c2::ControllerSocketReporter>& reporter = nullptr) {
if (connection_type_ == ConnectionType::SSL_FROM_CONFIGURATION) {
configuration_->set(minifi::Configure::nifi_remote_input_secure, "true");
}
controller_socket_protocol_ = std::make_unique<minifi::c2::ControllerSocketProtocol>(*update_sink_, configuration_, reporter);
controller_socket_protocol_->initialize();
}
void setConnectionType(ConnectionType connection_type) {
connection_type_ = connection_type;
if (connection_type_ == ConnectionType::UNSECURE) {
controller_socket_data_.ssl_context_service = nullptr;
} else {
controller_socket_data_.ssl_context_service = ssl_context_service_;
}
}
protected:
ConnectionType connection_type_ = ConnectionType::UNSECURE;
std::shared_ptr<minifi::Configure> configuration_;
std::shared_ptr<TestStateController> controller_;
std::unique_ptr<TestUpdateSink> update_sink_;
std::unique_ptr<minifi::c2::ControllerSocketProtocol> controller_socket_protocol_;
std::shared_ptr<controllers::SSLContextServiceInterface> ssl_context_service_;
minifi::utils::net::SocketData controller_socket_data_;
};
TEST_CASE_METHOD(ControllerTestFixture, "Test listComponents", "[controllerTests]") {
SECTION("With SSL from properties") {
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_CONFIGURATION);
}
SECTION("Without SSL") {
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
}
initalizeControllerSocket();
minifi::controller::startComponent(controller_socket_data_, "TestStateController");
using org::apache::nifi::minifi::test::utils::verifyEventHappenedInPollTime;
REQUIRE(verifyEventHappenedInPollTime(5s, [&] { return controller_->isRunning(); }, 20ms));
minifi::controller::stopComponent(controller_socket_data_, "TestStateController");
REQUIRE(verifyEventHappenedInPollTime(5s, [&] { return !controller_->isRunning(); }, 20ms));
std::stringstream ss;
minifi::controller::listComponents(controller_socket_data_, ss);
REQUIRE(ss.str() == "Components:\nTestStateController, running: false\n");
}
TEST_CASE_METHOD(ControllerTestFixture, "TestClear", "[controllerTests]") {
SECTION("With SSL from properties") {
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_CONFIGURATION);
}
SECTION("Without SSL") {
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
}
initalizeControllerSocket();
minifi::controller::startComponent(controller_socket_data_, "TestStateController");
using org::apache::nifi::minifi::test::utils::verifyEventHappenedInPollTime;
REQUIRE(verifyEventHappenedInPollTime(5s, [&] { return controller_->isRunning(); }, 20ms));
for (auto i = 0; i < 3; ++i) {
minifi::controller::clearConnection(controller_socket_data_, "connection");
}
REQUIRE(verifyEventHappenedInPollTime(5s, [&] { return 3 == update_sink_->clear_calls; }, 20ms));
}
TEST_CASE_METHOD(ControllerTestFixture, "TestUpdate", "[controllerTests]") {
SECTION("With SSL from properties") {
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_CONFIGURATION);
}
SECTION("Without SSL") {
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
}
initalizeControllerSocket();
minifi::controller::startComponent(controller_socket_data_, "TestStateController");
using org::apache::nifi::minifi::test::utils::verifyEventHappenedInPollTime;
REQUIRE(verifyEventHappenedInPollTime(5s, [&] { return controller_->isRunning(); }, 20ms));
std::stringstream ss;
minifi::controller::updateFlow(controller_socket_data_, ss, "connection");
REQUIRE(verifyEventHappenedInPollTime(5s, [&] { return 1 == update_sink_->update_calls; }, 20ms));
REQUIRE(0 == update_sink_->clear_calls);
}
TEST_CASE_METHOD(ControllerTestFixture, "Test connection getters on empty flow", "[controllerTests]") {
SECTION("With SSL from properties") {
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_CONFIGURATION);
}
SECTION("Without SSL") {
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
}
initalizeControllerSocket();
{
std::stringstream connection_stream;
minifi::controller::getConnectionSize(controller_socket_data_, connection_stream, "con1");
CHECK(connection_stream.str() == "Size/Max of con1 not found\n");
}
{
std::stringstream connection_stream;
minifi::controller::getFullConnections(controller_socket_data_, connection_stream);
CHECK(connection_stream.str() == "0 are full\n");
}
{
std::stringstream connection_stream;
minifi::controller::listConnections(controller_socket_data_, connection_stream);
CHECK(connection_stream.str() == "Connection Names:\n");
}
{
std::stringstream connection_stream;
minifi::controller::listConnections(controller_socket_data_, connection_stream, false);
CHECK(connection_stream.str().empty());
}
}
TEST_CASE_METHOD(ControllerTestFixture, "Test connection getters", "[controllerTests]") {
SECTION("With SSL from properties") {
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_CONFIGURATION);
}
SECTION("Without SSL") {
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
}
auto reporter = std::make_shared<TestControllerSocketReporter>();
initalizeControllerSocket(reporter);
{
std::stringstream connection_stream;
minifi::controller::getConnectionSize(controller_socket_data_, connection_stream, "conn");
CHECK(connection_stream.str() == "Size/Max of conn not found\n");
}
{
std::stringstream connection_stream;
minifi::controller::getConnectionSize(controller_socket_data_, connection_stream, "con1");
CHECK(connection_stream.str() == "Size/Max of con1 1 / 2\n");
}
{
std::stringstream connection_stream;
minifi::controller::getFullConnections(controller_socket_data_, connection_stream);
CHECK(connection_stream.str() == "1 are full\ncon2 is full\n");
}
{
std::stringstream connection_stream;
minifi::controller::listConnections(controller_socket_data_, connection_stream);
auto lines = minifi::utils::string::splitRemovingEmpty(connection_stream.str(), "\n");
CHECK(lines.size() == 3);
CHECK(ranges::find(lines, "Connection Names:") != ranges::end(lines));
CHECK(ranges::find(lines, "con1") != ranges::end(lines));
CHECK(ranges::find(lines, "con2") != ranges::end(lines));
}
{
std::stringstream connection_stream;
minifi::controller::listConnections(controller_socket_data_, connection_stream, false);
auto lines = minifi::utils::string::splitRemovingEmpty(connection_stream.str(), "\n");
CHECK(lines.size() == 2);
CHECK(ranges::find(lines, "con1") != ranges::end(lines));
CHECK(ranges::find(lines, "con2") != ranges::end(lines));
}
}
TEST_CASE_METHOD(ControllerTestFixture, "Test manifest getter", "[controllerTests]") {
SECTION("With SSL from properties") {
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_CONFIGURATION);
}
SECTION("Without SSL") {
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
}
auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoaderImpl>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);
std::stringstream manifest_stream;
minifi::controller::printManifest(controller_socket_data_, manifest_stream);
REQUIRE(manifest_stream.str().find("\"agentType\": \"cpp\",") != std::string::npos);
}
TEST_CASE_METHOD(ControllerTestFixture, "Test jstack getter", "[controllerTests]") {
SECTION("With SSL from properties") {
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_CONFIGURATION);
}
SECTION("Without SSL") {
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
}
auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoaderImpl>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);
std::stringstream jstack_stream;
minifi::controller::getJstacks(controller_socket_data_, jstack_stream);
std::string expected_trace = "trace1 -- bt line 1 for trace1\n"
"trace1 -- bt line 2 for trace1\n"
"trace2 -- bt line 1 for trace2\n"
"trace2 -- bt line 2 for trace2\n"
"trace2 -- bt line 3 for trace2\n\n";
REQUIRE(jstack_stream.str() == expected_trace);
}
TEST_CASE_METHOD(ControllerTestFixture, "Test debug bundle getter", "[controllerTests]") {
SECTION("With SSL from properties") {
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_CONFIGURATION);
}
SECTION("Without SSL") {
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
}
auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoaderImpl>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);
TestController test_controller;
auto output_dir = test_controller.createTempDirectory();
REQUIRE(minifi::controller::getDebugBundle(controller_socket_data_, output_dir));
REQUIRE(std::filesystem::exists(output_dir / "debug.tar.gz"));
}
TEST_CASE_METHOD(ControllerTestFixture, "Test debug bundle is created to non-existent folder", "[controllerTests]") {
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoaderImpl>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);
TestController test_controller;
auto output_dir = test_controller.createTempDirectory() / "subfolder";
REQUIRE(minifi::controller::getDebugBundle(controller_socket_data_, output_dir));
REQUIRE(std::filesystem::exists(output_dir / "debug.tar.gz"));
}
TEST_CASE_METHOD(ControllerTestFixture, "Debug bundle retrieval fails if target path is an existing file", "[controllerTests]") {
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoaderImpl>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);
TestController test_controller;
auto invalid_path = test_controller.createTempDirectory() / "test.log";
std::ofstream file(invalid_path);
auto result = minifi::controller::getDebugBundle(controller_socket_data_, invalid_path);
REQUIRE(!result);
REQUIRE(result.error() == "Object specified as the target directory already exists and it is not a directory");
}
TEST_CASE_METHOD(ControllerTestFixture, "Test flow status getter", "[controllerTests]") {
SECTION("With SSL from properties") {
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_CONFIGURATION);
}
SECTION("Without SSL") {
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
}
auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoaderImpl>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
reporter->initialize(configuration_, response_node_loader);
initalizeControllerSocket(reporter);
std::stringstream flow_status_stream;
minifi::controller::getFlowStatus(controller_socket_data_, "processor:TailFile:health", flow_status_stream);
std::string expected_status = "{\"controllerServiceStatusList\":null,\"connectionStatusList\":null,\"remoteProcessGroupStatusList\":null,\"instanceStatus\":null,\"systemDiagnosticsStatus\":null,"
"\"processorStatusList\":[],\"errorsGeneratingReport\":[\"Unable to get processorStatus: No processor with key 'TailFile' to report status on\"]}\n";
REQUIRE(flow_status_stream.str() == expected_status);
}
} // namespace org::apache::nifi::minifi::test