| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include <span> |
| #include <string> |
| #include <vector> |
| #include <utility> |
| #include <memory> |
| |
| #include "Connection.h" |
| #include "unit/TestBase.h" |
| #include "unit/TestUtils.h" |
| #include "unit/Catch.h" |
| #include "unit/ProvenanceTestHelper.h" |
| #include "utils/gsl.h" |
| |
| namespace org::apache::nifi::minifi::test { |
| |
| using Timepoint = std::chrono::time_point<std::chrono::steady_clock>; |
| |
| enum EventKind { |
| Store, Load |
| }; |
| |
| struct SwapEvent { |
| EventKind kind; |
| std::vector<minifi::SwappedFlowFile> flow_files; |
| |
| void verifyTimes(std::initializer_list<unsigned> seconds) { |
| REQUIRE(flow_files.size() == seconds.size()); |
| size_t idx = 0; |
| for (auto& second : seconds) { |
| REQUIRE(flow_files[idx].to_be_processed_after == Timepoint{std::chrono::seconds{second}}); |
| ++idx; |
| } |
| } |
| }; |
| |
| class SwappingFlowFileTestRepo : public TestFlowRepository, public minifi::SwapManager { |
| public: |
| void store(std::vector<std::shared_ptr<core::FlowFile>> flow_files) override { |
| std::vector<minifi::SwappedFlowFile> ids; |
| for (const auto& ff : flow_files) { |
| ids.push_back(minifi::SwappedFlowFile{ff->getUUID(), ff->getPenaltyExpiration()}); |
| minifi::io::BufferStream output; |
| std::dynamic_pointer_cast<minifi::FlowFileRecord>(ff)->Serialize(output); |
| Put(ff->getUUIDStr().c_str(), reinterpret_cast<const uint8_t*>(output.getBuffer().data()), output.size()); |
| } |
| swap_events_.push_back({Store, ids}); |
| } |
| |
| std::future<std::vector<std::shared_ptr<core::FlowFile>>> load(std::vector<minifi::SwappedFlowFile> flow_files) override { |
| swap_events_.push_back({Load, flow_files}); |
| LoadTask load_task; |
| auto future = load_task.promise.get_future(); |
| load_task.result.reserve(flow_files.size()); |
| for (const auto& ff_id : flow_files) { |
| std::string value; |
| Get(ff_id.id.to_string().c_str(), value); |
| minifi::utils::Identifier container_id; |
| auto ff = minifi::FlowFileRecord::DeSerialize(gsl::make_span(value).as_span<const std::byte>(), content_repo_, container_id); |
| ff->setPenaltyExpiration(ff_id.to_be_processed_after); |
| load_task.result.push_back(std::move(ff)); |
| } |
| load_tasks_.push_back(std::move(load_task)); |
| return future; |
| } |
| |
| struct LoadTask { |
| std::promise<std::vector<std::shared_ptr<core::FlowFile>>> promise; |
| std::vector<std::shared_ptr<core::FlowFile>> result; |
| |
| void complete() { |
| promise.set_value(result); |
| } |
| }; |
| |
| std::vector<LoadTask> load_tasks_; |
| std::vector<SwapEvent> swap_events_; |
| }; |
| |
| using FlowFilePtr = std::shared_ptr<core::FlowFile>; |
| using FlowFilePtrVec = std::vector<FlowFilePtr>; |
| |
| struct FlowFileComparator { |
| bool operator()(const FlowFilePtr& left, const FlowFilePtr& right) const { |
| return left->getPenaltyExpiration() < right->getPenaltyExpiration(); |
| } |
| }; |
| |
| struct VerifiedQueue { |
| void push(FlowFilePtr ff) { |
| size(); |
| impl.push(ff); |
| ref_.insert(std::lower_bound(ref_.begin(), ref_.end(), ff, FlowFileComparator{}), ff); |
| size(); |
| } |
| |
| FlowFilePtr poll() { |
| size(); |
| FlowFilePtr ff = impl.pop(); |
| REQUIRE(!ref_.empty()); |
| // the order when flow files have the same penalty is not fixed |
| REQUIRE(ff->getPenaltyExpiration() == ref_.front()->getPenaltyExpiration()); |
| ref_.erase(ref_.begin()); |
| size(); |
| return ff; |
| } |
| |
| void verify(std::initializer_list<unsigned> live, std::optional<std::initializer_list<unsigned>> inter, std::initializer_list<unsigned> swapped) const { |
| // check live ffs |
| auto live_copy = utils::FlowFileQueueTestAccessor::get_queue_(impl); |
| REQUIRE(live_copy.size() == live.size()); |
| for (auto sec : live) { |
| auto min = live_copy.popMin(); |
| REQUIRE(min->getPenaltyExpiration() == Timepoint{std::chrono::seconds{sec}}); |
| } |
| |
| // check inter ffs |
| if (!inter) { |
| REQUIRE_FALSE(utils::FlowFileQueueTestAccessor::get_load_task_(impl).has_value()); |
| } else { |
| auto& intermediate = utils::FlowFileQueueTestAccessor::get_load_task_(impl)->intermediate_items; |
| REQUIRE(intermediate.size() == inter->size()); |
| size_t idx = 0; |
| for (auto sec : inter.value()) { |
| REQUIRE(intermediate[idx]->getPenaltyExpiration() == Timepoint{std::chrono::seconds{sec}}); |
| ++idx; |
| } |
| } |
| |
| // check swapped ffs |
| auto swapped_copy = utils::FlowFileQueueTestAccessor::get_swapped_flow_files_(impl); |
| REQUIRE(swapped_copy.size() == swapped.size()); |
| for (auto sec : swapped) { |
| auto min = swapped_copy.popMin(); |
| REQUIRE(min.to_be_processed_after == Timepoint{std::chrono::seconds{sec}}); |
| } |
| } |
| |
| bool isWorkAvailable() const { |
| return impl.isWorkAvailable(); |
| } |
| |
| size_t size() const { |
| size_t result = impl.size(); |
| REQUIRE(result == ref_.size()); |
| return result; |
| } |
| |
| explicit VerifiedQueue(std::shared_ptr<minifi::SwapManager> swap_manager) |
| : impl(std::move(swap_manager)) {} |
| |
| minifi::utils::FlowFileQueue impl; |
| FlowFilePtrVec ref_; |
| }; |
| |
| class SwapTestController : public TestController { |
| public: |
| SwapTestController() { |
| content_repo_ = std::make_shared<core::repository::VolatileContentRepository>(); |
| flow_repo_ = std::make_shared<SwappingFlowFileTestRepo>(); |
| flow_repo_->loadComponent(content_repo_); |
| clock_ = std::make_shared<minifi::test::utils::ManualClock>(); |
| minifi::utils::timeutils::setClock(clock_); |
| queue_ = std::make_shared<VerifiedQueue>(std::static_pointer_cast<minifi::SwapManager>(flow_repo_)); |
| } |
| |
| void setLimits(size_t min_size, size_t target_size, size_t max_size) { |
| queue_->impl.setMinSize(min_size); |
| queue_->impl.setTargetSize(target_size); |
| queue_->impl.setMaxSize(max_size); |
| } |
| |
| struct SwapEventPattern { |
| EventKind kind; |
| std::initializer_list<unsigned > seconds; |
| }; |
| |
| void verifySwapEvents(std::vector<SwapEventPattern> events) { |
| REQUIRE(flow_repo_->swap_events_.size() == events.size()); |
| size_t idx = 0; |
| for (auto& pattern : events) { |
| REQUIRE(pattern.kind == flow_repo_->swap_events_[idx].kind); |
| flow_repo_->swap_events_[idx].verifyTimes(pattern.seconds); |
| } |
| } |
| |
| void clearSwapEvents() { |
| flow_repo_->swap_events_.clear(); |
| } |
| |
| void verifyQueue(std::initializer_list<unsigned> live, std::optional<std::initializer_list<unsigned>> inter, std::initializer_list<unsigned> swapped) { |
| queue_->verify(live, inter, swapped); |
| } |
| |
| void pushAll(std::initializer_list<unsigned> seconds) { |
| for (auto sec : seconds) { |
| auto ff = std::static_pointer_cast<core::FlowFile>(std::make_shared<minifi::FlowFileRecordImpl>()); |
| ff->setPenaltyExpiration(Timepoint{std::chrono::seconds{sec}}); |
| queue_->push(std::move(ff)); |
| } |
| } |
| |
| void popAll(std::initializer_list<unsigned> seconds, bool check_is_work_available = false) { |
| for (auto sec : seconds) { |
| if (check_is_work_available) { |
| REQUIRE(queue_->isWorkAvailable()); |
| } |
| auto ff = queue_->poll(); |
| REQUIRE(ff->getPenaltyExpiration() == Timepoint{std::chrono::seconds{sec}}); |
| } |
| } |
| |
| std::shared_ptr<SwappingFlowFileTestRepo> flow_repo_; |
| std::shared_ptr<core::repository::VolatileContentRepository> content_repo_; |
| std::shared_ptr<VerifiedQueue> queue_; |
| std::shared_ptr<minifi::test::utils::ManualClock> clock_; |
| }; |
| |
| TEST_CASE("Setting swap threshold sets underlying queue limits", "[SwapTest1]") { |
| const size_t target_size = 4; |
| const size_t min_size = target_size / 2; |
| const size_t max_size = target_size * 3 / 2; |
| |
| minifi::ConnectionImpl conn(nullptr, nullptr, ""); |
| conn.setSwapThreshold(target_size); |
| REQUIRE(utils::FlowFileQueueTestAccessor::get_min_size_(utils::ConnectionTestAccessor::get_queue_(conn)) == min_size); |
| REQUIRE(utils::FlowFileQueueTestAccessor::get_target_size_(utils::ConnectionTestAccessor::get_queue_(conn)) == target_size); |
| REQUIRE(utils::FlowFileQueueTestAccessor::get_max_size_(utils::ConnectionTestAccessor::get_queue_(conn)) == max_size); |
| } |
| |
| TEST_CASE_METHOD(SwapTestController, "Default constructed FlowFileQueue won't swap", "[SwapTest2]") { |
| for (unsigned i = 0; i < 100; ++i) { |
| pushAll({i}); |
| } |
| |
| REQUIRE(queue_->impl.size() == 100); |
| |
| clock_->advance(std::chrono::seconds{200}); |
| for (size_t i = 0; i < 100; ++i) { |
| queue_->poll(); |
| } |
| |
| REQUIRE(queue_->impl.empty()); |
| verifySwapEvents({}); |
| } |
| |
| TEST_CASE_METHOD(SwapTestController, "Up to max no swap-out is triggered", "[SwapTest3]") { |
| setLimits(2, 4, 6); |
| pushAll({50, 20, 30, 60, 10, 40}); |
| |
| REQUIRE_FALSE(queue_->isWorkAvailable()); |
| verifySwapEvents({}); |
| } |
| |
| TEST_CASE_METHOD(SwapTestController, "Pushing beyond max triggers a swap-out", "[SwapTest4]") { |
| setLimits(2, 4, 6); |
| pushAll({50, 20, 30, 60, 10, 40}); |
| |
| pushAll({28}); |
| // size goes from 7 to 4, 3 largest must have been swapped out |
| verifySwapEvents({{Store, {60, 50, 40}}}); |
| verifyQueue({10, 20, 28, 30}, {}, {40, 50, 60}); |
| } |
| |
| TEST_CASE_METHOD(SwapTestController, "Popping until min size does not trigger swap-in", "[SwapTest5]") { |
| setLimits(2, 4, 6); |
| pushAll({50, 20, 30, 60, 10, 40, 28}); |
| clearSwapEvents(); |
| clock_->advance(std::chrono::seconds{35}); |
| REQUIRE(queue_->isWorkAvailable()); |
| popAll({10, 20}); |
| verifyQueue({28, 30}, {}, {40, 50, 60}); |
| verifySwapEvents({}); |
| } |
| |
| TEST_CASE_METHOD(SwapTestController, "Popping beyond min size triggers swap-in", "[SwapTest6]") { |
| setLimits(2, 4, 6); |
| pushAll({50, 20, 30, 60, 10, 40, 28}); |
| clearSwapEvents(); |
| clock_->advance(std::chrono::seconds{35}); |
| popAll({10, 20, 28}); |
| |
| // trying to swap-in all three swapped flow files |
| verifyQueue({30}, {{}}, {}); |
| verifySwapEvents({{Load, {40, 50, 60}}}); |
| } |
| |
| TEST_CASE_METHOD(SwapTestController, "Pushing while a swap-in is pending", "[SwapTest7]") { |
| setLimits(2, 4, 6); |
| pushAll({50, 20, 30, 60, 10, 40, 28}); |
| clock_->advance(std::chrono::seconds{35}); |
| popAll({10, 20, 28}); |
| verifyQueue({30}, {{}}, {}); |
| clearSwapEvents(); |
| |
| SECTION("Pushing into the pending swap-in range") { |
| pushAll({45}); |
| verifyQueue({30}, {{45}}, {}); |
| verifySwapEvents({}); |
| } |
| |
| SECTION("Pushing before the pending swap-in range") { |
| pushAll({35}); |
| verifyQueue({30, 35}, {{}}, {}); |
| verifySwapEvents({}); |
| } |
| |
| SECTION("Pushing after the pending swap-in range") { |
| pushAll({65}); |
| verifyQueue({30}, {{}}, {65}); |
| verifySwapEvents({{Store, {65}}}); |
| } |
| } |
| |
| TEST_CASE_METHOD(SwapTestController, "isWorkAvailable depends on the swap-in task", "[SwapTest8]") { |
| setLimits(2, 4, 6); |
| pushAll({50, 20, 30, 60, 10, 40, 28}); |
| popAll({10, 20, 28}); |
| verifyQueue({30}, {{}}, {}); |
| |
| REQUIRE_FALSE(queue_->isWorkAvailable()); |
| |
| clock_->advance(std::chrono::seconds{35}); |
| |
| REQUIRE(queue_->isWorkAvailable()); |
| popAll({30}); |
| REQUIRE_FALSE(queue_->isWorkAvailable()); |
| |
| SECTION("Load is completed but the minimum of those files is still not viable") { |
| flow_repo_->load_tasks_[0].complete(); |
| REQUIRE_FALSE(queue_->isWorkAvailable()); |
| } |
| |
| SECTION("The minimum of the load task is viable but not yet completed") { |
| clock_->advance(std::chrono::seconds{35}); |
| REQUIRE_FALSE(queue_->isWorkAvailable()); |
| // completing the task renders it viable |
| flow_repo_->load_tasks_[0].complete(); |
| REQUIRE(queue_->isWorkAvailable()); |
| } |
| } |
| |
| TEST_CASE_METHOD(SwapTestController, "Polling from load task", "[SwapTest8]") { |
| setLimits(2, 4, 6); |
| pushAll({50, 20, 30, 60, 10, 40, 28}); |
| popAll({10, 20, 28}); |
| pushAll({45}); |
| verifyQueue({30}, {{45}}, {}); |
| |
| flow_repo_->load_tasks_[0].complete(); |
| |
| clock_->advance(std::chrono::seconds{100}); |
| |
| popAll({30, 40, 45, 50, 60}, true); |
| verifyQueue({}, {}, {}); |
| } |
| |
| TEST_CASE_METHOD(SwapTestController, "Popping below min checks if the pending load is finished", "[SwapTest8]") { |
| setLimits(6, 8, 10); |
| pushAll({10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120}); |
| verifyQueue({10, 20, 30, 40, 50, 60, 70, 80}, {}, {90, 100, 110, 120}); |
| clock_->advance(std::chrono::seconds{200}); |
| clearSwapEvents(); |
| popAll({10, 20, 30}); |
| verifySwapEvents({{Load, {90, 100, 110}}}); |
| verifyQueue({40, 50, 60, 70, 80}, {{}}, {120}); |
| clearSwapEvents(); |
| |
| popAll({40, 50}); |
| verifyQueue({60, 70, 80}, {{}}, {120}); |
| flow_repo_->load_tasks_[0].complete(); |
| popAll({60}); |
| // even though the live queue is not empty we check if |
| // the load_task is finished and initiate a load if need be |
| verifySwapEvents({{Load, {120}}}); |
| verifyQueue({70, 80, 90, 100, 110}, {{}}, {}); |
| } |
| |
| } // namespace org::apache::nifi::minifi::test |