blob: 51721b39118b76974b06952792b968841ccc5983 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <iostream>
#include "integration/IntegrationBase.h"
#include "unit/StatefulProcessor.h"
#include "unit/TestBase.h"
#include "unit/TestUtils.h"
#include "core/state/ProcessorController.h"
#include "unit/Catch.h"
namespace org::apache::nifi::minifi::test {
using minifi::processors::StatefulProcessor;
using minifi::state::ProcessorController;
using LogChecker = std::function<bool()>;
struct HookCollection {
StatefulProcessor::HookType on_schedule_hook_;
std::vector<StatefulProcessor::HookType> on_trigger_hooks_;
LogChecker log_checker_;
};
class StatefulIntegrationTest : public IntegrationBase {
public:
explicit StatefulIntegrationTest(std::string testCase, HookCollection hookCollection)
: on_schedule_hook_(std::move(hookCollection.on_schedule_hook_))
, on_trigger_hooks_(std::move(hookCollection.on_trigger_hooks_))
, log_checker_(hookCollection.log_checker_)
, test_case_(std::move(testCase)) {
}
void testSetup() override {
LogTestController::getInstance().reset();
LogTestController::getInstance().setDebug<core::ProcessGroup>();
LogTestController::getInstance().setDebug<core::Processor>();
LogTestController::getInstance().setDebug<core::ProcessSession>();
LogTestController::getInstance().setDebug<StatefulIntegrationTest>();
logger_->log_info("Running test case \"{}\"", test_case_);
}
void updateProperties(minifi::FlowController& fc) override {
/* This tests depends on a configuration that contains only one StatefulProcessor named statefulProcessor
* (See TestStateTransactionality.yml)
* In this case there are two components in the flowcontroller: first is the processor that the test uses,
* second is the controller itself.
* Added here some assertions to make it clear. In case any of these fail without changing the corresponding yml file,
* that most probably means a breaking change. */
size_t controllerVecIdx = 0;
fc.executeOnAllComponents([this, &controllerVecIdx](minifi::state::StateController& component){
if (controllerVecIdx == 1) {
REQUIRE(component.getComponentName() == "FlowController");
} else if (controllerVecIdx == 0) {
REQUIRE(component.getComponentName() == "statefulProcessor");
// set hooks
const auto processController = dynamic_cast<ProcessorController*>(&component);
REQUIRE(processController != nullptr);
stateful_processor_ = &processController->getProcessor();
REQUIRE(stateful_processor_);
stateful_processor_.get().setHooks(on_schedule_hook_, on_trigger_hooks_);
}
++controllerVecIdx;
});
// check controller vector size
REQUIRE(controllerVecIdx == 2);
}
void runAssertions() override {
REQUIRE(utils::verifyEventHappenedInPollTime(std::chrono::milliseconds(wait_time_), [&] {
return stateful_processor_.get().hasFinishedHooks() && log_checker_();
}));
}
private:
const StatefulProcessor::HookType on_schedule_hook_;
const std::vector<StatefulProcessor::HookType> on_trigger_hooks_;
const LogChecker log_checker_;
const std::string test_case_;
TypedProcessorWrapper<StatefulProcessor> stateful_processor_ = nullptr;
std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<StatefulIntegrationTest>::getLogger()};
};
const std::unordered_map<std::string, std::string> exampleState{{"key1", "value1"}, {"key2", "value2"}};
const std::unordered_map<std::string, std::string> exampleState2{{"key3", "value3"}, {"key4", "value4"}};
auto standardLogChecker = [] {
const std::string logs = LogTestController::getInstance().getLogs();
const auto errorResult = minifi::utils::string::countOccurrences(logs, "[error]");
const auto warningResult = minifi::utils::string::countOccurrences(logs, "[warning]");
return errorResult.second == 0 && warningResult.second == 0;
};
auto exceptionRollbackWarnings = [] {
const std::string logs = LogTestController::getInstance().getLogs();
const auto errorResult = minifi::utils::string::countOccurrences(logs, "[error]");
const auto exceptionWarningResult = minifi::utils::string::countOccurrences(logs, "[warning] Caught \"Triggering rollback\"");
const auto rollbackWarningResult = minifi::utils::string::countOccurrences(logs, "[warning] ProcessSession rollback for statefulProcessor executed");
return errorResult.second == 0 && exceptionWarningResult.second == 1 && rollbackWarningResult.second == 1;
};
const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
{"State_is_recorded_after_committing", {
{},
{
[] (core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
},
[] (core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
REQUIRE(stateManager.get(state));
REQUIRE(state == exampleState);
}
},
standardLogChecker
}},
{"State_is_discarded_after_rolling_back", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
},
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState2));
throw std::runtime_error("Triggering rollback");
},
[](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
REQUIRE(stateManager.get(state));
REQUIRE(state == exampleState);
}
},
exceptionRollbackWarnings
}},
{
"Get_in_onSchedule_without_previous_state", {
[](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
REQUIRE(!stateManager.get(state));
REQUIRE(state.empty());
},
{},
standardLogChecker
}
},
{
"Set_in_onSchedule", {
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
},
{
[](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
REQUIRE(stateManager.get(state));
REQUIRE(state == exampleState);
}
},
standardLogChecker
}
},
{
"Clear_in_onSchedule", {
[](core::StateManager& stateManager) {
REQUIRE(!stateManager.clear());
REQUIRE(stateManager.set(exampleState));
REQUIRE(stateManager.clear());
},
{
[](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
REQUIRE(!stateManager.get(state));
REQUIRE(state.empty());
}
},
standardLogChecker
},
},
{
"Persist_in_onSchedule", {
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.persist());
}
},
{},
standardLogChecker
}
},
{
"Manual_beginTransaction", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(!stateManager.beginTransaction());
}
},
standardLogChecker
},
},
{
"Manual_commit", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
REQUIRE(stateManager.commit());
}
},
standardLogChecker
},
},
{
"Manual_rollback", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.rollback());
}
},
standardLogChecker
},
},
{
"Get_without_previous_state", {
{},
{
[](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
REQUIRE(!stateManager.get(state));
REQUIRE(state.empty());
}
},
standardLogChecker
},
},
{
"(set),(get,get)", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
},
[](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
REQUIRE(stateManager.get(state));
REQUIRE(state == exampleState);
REQUIRE(stateManager.get(state));
REQUIRE(state == exampleState);
}
},
standardLogChecker
},
},
{
"(set),(get,set)", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
},
[](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
REQUIRE(stateManager.get(state));
REQUIRE(state == exampleState);
REQUIRE(stateManager.set(exampleState));
}
},
standardLogChecker
},
},
{
"(set),(get,clear)", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
},
[](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
REQUIRE(stateManager.get(state));
REQUIRE(state == exampleState);
REQUIRE(stateManager.clear());
}
},
standardLogChecker
},
},
{
"(set),(get,persist)", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
},
[](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
REQUIRE(stateManager.get(state));
REQUIRE(state == exampleState);
REQUIRE(stateManager.persist());
}
},
standardLogChecker
},
},
{
"(set,!get)", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
std::unordered_map<std::string, std::string> state;
REQUIRE(!stateManager.get(state));
REQUIRE(state.empty());
},
},
standardLogChecker
},
},
{
"(set,set)", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
REQUIRE(stateManager.set(exampleState));
},
},
standardLogChecker
},
},
{
"(set,!clear)", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
REQUIRE(!stateManager.clear());
},
},
standardLogChecker
},
},
{
"(set,persist)", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
REQUIRE(stateManager.persist());
},
},
standardLogChecker
},
},
{
"(set),(clear,!get)", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
},
[](core::StateManager& stateManager) {
REQUIRE(stateManager.clear());
std::unordered_map<std::string, std::string> state;
REQUIRE(!stateManager.get(state));
REQUIRE(state.empty());
}
},
standardLogChecker
},
},
{
"(set),(clear),(!get),(set),(get)", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
},
[](core::StateManager& stateManager) {
REQUIRE(stateManager.clear());
},
[](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
REQUIRE(!stateManager.get(state));
REQUIRE(state.empty());
},
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
},
[](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
REQUIRE(stateManager.get(state));
REQUIRE(state == exampleState);
}
},
standardLogChecker
},
},
{
"(set),(clear,set)", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
},
[](core::StateManager& stateManager) {
REQUIRE(stateManager.clear());
REQUIRE(stateManager.set(exampleState));
},
},
standardLogChecker
},
},
{
"(set),(clear),(!clear)", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
},
[](core::StateManager& stateManager) {
REQUIRE(stateManager.clear());
},
[](core::StateManager& stateManager) {
REQUIRE(!stateManager.clear());
},
},
standardLogChecker
},
},
{
"(set),(clear),(persist)", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
},
[](core::StateManager& stateManager) {
REQUIRE(stateManager.clear());
},
[](core::StateManager& stateManager) {
REQUIRE(stateManager.persist());
},
},
standardLogChecker
},
},
{
"(persist),(set),(get)", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.persist());
},
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
},
[](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
REQUIRE(stateManager.get(state));
REQUIRE(state == exampleState);
},
},
standardLogChecker
},
},
{
"(persist),(set),(clear)", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.persist());
},
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
},
[](core::StateManager& stateManager) {
REQUIRE(stateManager.clear());
},
},
standardLogChecker
},
},
{
"(persist),(persist)", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.persist());
},
[](core::StateManager& stateManager) {
REQUIRE(stateManager.persist());
},
},
standardLogChecker
},
},
{
"No_change_2_rounds", {
{},
{
[](core::StateManager&) {},
[](core::StateManager&) {},
},
standardLogChecker
},
},
{
"(!clear)", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(!stateManager.clear());
},
},
standardLogChecker
},
},
{
"(set),(get,throw)", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
},
[](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
REQUIRE(stateManager.get(state));
REQUIRE(state == exampleState);
throw std::runtime_error("Triggering rollback");
},
},
exceptionRollbackWarnings
}
},
{
"(set),(clear,throw),(get)", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.set(exampleState));
},
[](core::StateManager& stateManager) {
REQUIRE(stateManager.clear());
throw std::runtime_error("Triggering rollback");
},
[](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
REQUIRE(stateManager.get(state));
REQUIRE(state == exampleState);
},
},
exceptionRollbackWarnings
}
},
{
"(set),(clear,throw),(get)", {
{},
{
[](core::StateManager& stateManager) {
REQUIRE(stateManager.persist());
throw std::runtime_error("Triggering rollback");
},
},
exceptionRollbackWarnings
}
}
};
TEST_CASE("Test state transactionality", "[statemanagement]") {
for (const auto& test : testCasesToHookLists) {
StatefulIntegrationTest statefulIntegrationTest(test.first, test.second);
const auto test_file_location = std::filesystem::path(TEST_RESOURCES) / "TestStateTransactionality.yml";
statefulIntegrationTest.run(test_file_location);
}
}
} // namespace org::apache::nifi::minifi::test