blob: 79267b20bfc48e0092881b594448c22dc6e94a32 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <string>
#include "unit/TestBase.h"
#include "integration/HTTPIntegrationBase.h"
#include "integration/HTTPHandlers.h"
#include "unit/Catch.h"
#include "core/Processor.h"
#include "core/controller/ControllerService.h"
#include "core/Resource.h"
using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::test {
class DummyController : public core::controller::ControllerServiceImpl {
public:
explicit DummyController(std::string_view name, const minifi::utils::Identifier &uuid = {}) : ControllerServiceImpl(name, uuid) {}
static constexpr const char* Description = "Dummy Controller";
static constexpr auto DummyControllerProperty = core::PropertyDefinitionBuilder<>::createProperty("Dummy Controller Property")
.withDescription("Dummy Controller Property")
.build();
static constexpr auto Properties = std::to_array<core::PropertyReference>({DummyControllerProperty});
static constexpr bool SupportsDynamicProperties = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES
void initialize() override {
setSupportedProperties(Properties);
}
void yield() override {
}
bool isWorkAvailable() override {
return false;
}
bool isRunning() const override {
return getState() == core::controller::ControllerServiceState::ENABLED;
}
void onEnable() override {
auto dummy_controller_property = getProperty(DummyControllerProperty.name);
if (!dummy_controller_property || dummy_controller_property->empty()) {
throw minifi::Exception(minifi::ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing dummy property");
}
}
private:
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<DummyController>::getLogger(uuid_);
};
REGISTER_RESOURCE(DummyController, ControllerService);
class DummmyControllerUserProcessor : public minifi::core::ProcessorImpl {
using minifi::core::ProcessorImpl::ProcessorImpl;
public:
DummmyControllerUserProcessor(std::string_view name, const minifi::utils::Identifier& uuid) : ProcessorImpl(name, uuid) {}
explicit DummmyControllerUserProcessor(std::string_view name) : ProcessorImpl(name) {}
static constexpr auto DummyControllerService = core::PropertyDefinitionBuilder<>::createProperty("Dummy Controller Service")
.withDescription("Dummy Controller Service")
.withAllowedTypes<DummyController>()
.build();
void initialize() override {
setSupportedProperties(Properties);
}
void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& /*session_factory*/) override {
if (auto controller_service = context.getProperty(DummmyControllerUserProcessor::DummyControllerService)) {
if (!std::dynamic_pointer_cast<DummyController>(context.getControllerService(*controller_service, uuid_))) {
throw minifi::Exception(minifi::ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service");
}
} else {
throw minifi::Exception(minifi::ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing controller service");
}
logger_->log_debug("DummyControllerUserProcessor::onSchedule successful");
}
static constexpr const char* Description = "A processor that uses controller.";
static constexpr auto Properties = std::array<core::PropertyReference, 1>{DummyControllerService};
static constexpr auto Relationships = std::array<core::RelationshipDefinition, 0>{};
static constexpr bool SupportsDynamicProperties = false;
static constexpr bool SupportsDynamicRelationships = false;
static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED;
static constexpr bool IsSingleThreaded = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
private:
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<DummmyControllerUserProcessor>::getLogger(uuid_);
};
REGISTER_RESOURCE(DummmyControllerUserProcessor, Processor);
class VerifyC2ControllerUpdate : public VerifyC2Base {
public:
explicit VerifyC2ControllerUpdate(const std::atomic_bool& flow_updated_successfully) : flow_updated_successfully_(flow_updated_successfully) {
}
void testSetup() override {
LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
LogTestController::getInstance().setDebug<DummmyControllerUserProcessor>();
LogTestController::getInstance().setDebug<DummyController>();
LogTestController::getInstance().setDebug<core::controller::StandardControllerServiceProvider>();
VerifyC2Base::testSetup();
}
void runAssertions() override {
using org::apache::nifi::minifi::test::utils::verifyEventHappenedInPollTime;
REQUIRE(verifyEventHappenedInPollTime(40s, [&] { return flow_updated_successfully_.load(); }, 1s));
}
private:
const std::atomic_bool& flow_updated_successfully_;
};
class ControllerUpdateHandler: public HeartbeatHandler {
public:
explicit ControllerUpdateHandler(std::atomic_bool& flow_updated_successfully, std::shared_ptr<minifi::Configure> configuration, const std::filesystem::path& replacement_config_path)
: HeartbeatHandler(std::move(configuration)),
flow_updated_successfully_(flow_updated_successfully),
replacement_config_(minifi::utils::file::get_content(replacement_config_path.string())) {
}
void handleHeartbeat(const rapidjson::Document& /*root*/, struct mg_connection* conn) override {
switch (test_state_) {
case TestState::VERIFY_INITIAL_METRICS: {
sendEmptyHeartbeatResponse(conn);
REQUIRE(minifi::test::utils::verifyLogLinePresenceInPollTime(5s, "Could not enable DummyController"));
REQUIRE(minifi::test::utils::verifyLogLinePresenceInPollTime(5s, "(DummmyControllerUserProcessor): Process Schedule Operation: Invalid controller service"));
test_state_ = TestState::SEND_NEW_CONFIG;
break;
}
case TestState::SEND_NEW_CONFIG: {
sendHeartbeatResponse("UPDATE", "configuration", "889349", conn, {{"configuration_data", minifi::c2::C2Value{replacement_config_}}});
test_state_ = TestState::VERIFY_UPDATED_METRICS;
break;
}
case TestState::VERIFY_UPDATED_METRICS: {
sendEmptyHeartbeatResponse(conn);
if (minifi::test::utils::verifyLogLinePresenceInPollTime(0s, "DummyControllerUserProcessor::onSchedule successful")) {
flow_updated_successfully_ = true;
}
break;
}
}
}
private:
enum class TestState {
VERIFY_INITIAL_METRICS,
SEND_NEW_CONFIG,
VERIFY_UPDATED_METRICS
};
static void sendEmptyHeartbeatResponse(struct mg_connection* conn) {
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
}
std::atomic_bool& flow_updated_successfully_;
TestState test_state_ = TestState::VERIFY_INITIAL_METRICS;
std::string replacement_config_;
};
TEST_CASE("C2ControllerEnableFailureTest", "[c2test]") {
std::atomic_bool flow_updated_successfully{false};
VerifyC2ControllerUpdate harness(flow_updated_successfully);
const auto test_file_path = std::filesystem::path(TEST_RESOURCES) / "TestC2InvalidController.yml";
auto replacement_path = test_file_path.string();
minifi::utils::string::replaceAll(replacement_path, "TestC2InvalidController", "TestC2ValidController");
ControllerUpdateHandler handler(flow_updated_successfully, harness.getConfiguration(), replacement_path);
harness.setUrl("https://localhost:0/api/heartbeat", &handler);
harness.run(test_file_path);
}
} // namespace org::apache::nifi::minifi::test