blob: 21d87701ea3f14f4f2f6d0de064b69fe0eb8ce4f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <chrono>
#include "FlowFileQueue.h"
#include "unit/TestBase.h"
#include "unit/Catch.h"
#include "unit/TestUtils.h"
namespace core = minifi::core;
TEST_CASE("After construction, a FlowFileQueue is empty", "[FlowFileQueue]") {
utils::FlowFileQueue queue;
REQUIRE(queue.empty());
REQUIRE(queue.size() == 0); // NOLINT(readability-container-size-empty)
REQUIRE_FALSE(queue.isWorkAvailable());
REQUIRE_THROWS(queue.pop());
}
TEST_CASE("If a non-penalized flow file is added to the FlowFileQueue, we can pop it", "[FlowFileQueue][pop]") {
utils::FlowFileQueue queue;
const auto flow_file = std::make_shared<core::FlowFileImpl>();
queue.push(flow_file);
REQUIRE_FALSE(queue.empty());
REQUIRE(queue.size() == 1);
REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file);
}
TEST_CASE("A flow file can be moved into the FlowFileQueue", "[FlowFileQueue][pop]") {
utils::FlowFileQueue queue;
auto penalized_flow_file = std::make_shared<core::FlowFileImpl>();
penalized_flow_file->penalize(std::chrono::milliseconds{100});
queue.push(std::move(penalized_flow_file));
queue.push(std::make_shared<core::FlowFileImpl>());
REQUIRE_FALSE(queue.empty());
REQUIRE(queue.size() == 2);
}
TEST_CASE("If three flow files are added to the FlowFileQueue, we can pop them in FIFO order", "[FlowFileQueue][pop]") {
utils::FlowFileQueue queue;
const auto flow_file_1 = std::make_shared<core::FlowFileImpl>();
queue.push(flow_file_1);
const auto flow_file_2 = std::make_shared<core::FlowFileImpl>();
queue.push(flow_file_2);
const auto flow_file_3 = std::make_shared<core::FlowFileImpl>();
queue.push(flow_file_3);
REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file_1);
REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file_2);
REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file_3);
REQUIRE_FALSE(queue.isWorkAvailable());
}
TEST_CASE("Cannot add flow files in the past preempting others", "[FlowFileQueue][pop]") {
utils::FlowFileQueue queue;
const auto flow_file_1 = std::make_shared<core::FlowFileImpl>();
queue.push(flow_file_1);
const auto flow_file_2 = std::make_shared<core::FlowFileImpl>();
flow_file_2->penalize(std::chrono::seconds{-10});
queue.push(flow_file_2);
REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file_1);
REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file_2);
REQUIRE_FALSE(queue.isWorkAvailable());
}
namespace {
class PenaltyHasExpired {
public:
explicit PenaltyHasExpired(std::shared_ptr<core::FlowFile> flow_file) : flow_file_(std::move(flow_file)) {}
bool operator()() { return !flow_file_->isPenalized(); }
private:
std::shared_ptr<core::FlowFile> flow_file_;
};
} // namespace
TEST_CASE("Penalized flow files are popped from the FlowFileQueue in the order their penalties expire", "[FlowFileQueue][pop]") {
utils::FlowFileQueue queue;
const auto flow_file_1 = std::make_shared<core::FlowFileImpl>();
flow_file_1->penalize(std::chrono::milliseconds{70});
queue.push(flow_file_1);
const auto flow_file_2 = std::make_shared<core::FlowFileImpl>();
flow_file_2->penalize(std::chrono::milliseconds{50});
queue.push(flow_file_2);
const auto flow_file_3 = std::make_shared<core::FlowFileImpl>();
flow_file_3->penalize(std::chrono::milliseconds{80});
queue.push(flow_file_3);
const auto flow_file_4 = std::make_shared<core::FlowFileImpl>();
flow_file_4->penalize(std::chrono::milliseconds{60});
queue.push(flow_file_4);
REQUIRE_FALSE(queue.isWorkAvailable());
REQUIRE(minifi::test::utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{flow_file_2}, std::chrono::milliseconds{10}));
REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file_2);
REQUIRE(minifi::test::utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{flow_file_4}, std::chrono::milliseconds{10}));
REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file_4);
REQUIRE(minifi::test::utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{flow_file_1}, std::chrono::milliseconds{10}));
REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file_1);
REQUIRE(minifi::test::utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{flow_file_3}, std::chrono::milliseconds{10}));
REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file_3);
REQUIRE_FALSE(queue.isWorkAvailable());
}
TEST_CASE("If a penalized then a non-penalized flow file is added to the FlowFileQueue, pop() returns the correct one", "[FlowFileQueue][pop]") {
utils::FlowFileQueue queue;
const auto penalized_flow_file = std::make_shared<core::FlowFileImpl>();
penalized_flow_file->penalize(std::chrono::milliseconds{10});
queue.push(penalized_flow_file);
const auto flow_file = std::make_shared<core::FlowFileImpl>();
queue.push(flow_file);
SECTION("Try popping right away") {
REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file);
REQUIRE_FALSE(queue.isWorkAvailable());
}
SECTION("Wait until the penalty expires, then pop") {
REQUIRE(minifi::test::utils::verifyEventHappenedInPollTime(std::chrono::seconds{1}, PenaltyHasExpired{penalized_flow_file}, std::chrono::milliseconds{10}));
REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == flow_file);
REQUIRE(queue.isWorkAvailable());
REQUIRE(queue.pop() == penalized_flow_file);
REQUIRE_FALSE(queue.isWorkAvailable());
}
}
TEST_CASE("Force pop on FlowFileQueue returns the flow files, whether penalized or not", "[FlowFileQueue][forcePop]") {
utils::FlowFileQueue queue;
const auto penalized_flow_file = std::make_shared<core::FlowFileImpl>();
penalized_flow_file->penalize(std::chrono::milliseconds{10});
queue.push(penalized_flow_file);
const auto flow_file = std::make_shared<core::FlowFileImpl>();
queue.push(flow_file);
REQUIRE_FALSE(queue.empty());
REQUIRE(queue.pop() == flow_file);
REQUIRE(queue.pop() == penalized_flow_file);
REQUIRE(queue.empty());
}