blob: 837108dff1aeb20a023b6f8bd342c8a81c3d1e92 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "utils/FlowFileQueue.h"
#include "core/logging/LoggerFactory.h"
namespace org::apache::nifi::minifi::utils {
bool FlowFileQueue::FlowFilePenaltyExpirationComparator::operator()(const value_type& left, const value_type& right) const {
// a flow file with earlier expiration compares less
return left->getPenaltyExpiration() < right->getPenaltyExpiration();
}
bool FlowFileQueue::SwappedFlowFileComparator::operator()(const SwappedFlowFile& left, const SwappedFlowFile& right) const {
// a swapped flow file with earlier expiration compares less
return left.to_be_processed_after < right.to_be_processed_after;
}
FlowFileQueue::FlowFileQueue(std::shared_ptr<SwapManager> swap_manager)
: swap_manager_(std::move(swap_manager)),
logger_(core::logging::LoggerFactory<FlowFileQueue>::getLogger()) {}
FlowFileQueue::value_type FlowFileQueue::pop() {
return tryPopImpl({}).value();
}
std::optional<FlowFileQueue::value_type> FlowFileQueue::tryPop() {
return tryPopImpl(std::chrono::milliseconds{0});
}
std::optional<FlowFileQueue::value_type> FlowFileQueue::tryPop(std::chrono::milliseconds timeout) {
return tryPopImpl(timeout);
}
std::optional<FlowFileQueue::value_type> FlowFileQueue::tryPopImpl(std::optional<std::chrono::milliseconds> timeout) {
std::optional<std::shared_ptr<core::FlowFile>> result;
if (!queue_.empty()) {
result = queue_.popMin();
if (processLoadTaskWait(std::chrono::milliseconds{0})) {
initiateLoadIfNeeded();
}
return result;
}
if (load_task_) {
logger_->log_debug("Head is empty checking already running load task");
if (!processLoadTaskWait(timeout)) {
return std::nullopt;
}
if (!queue_.empty()) {
// load provided items
result = queue_.popMin();
initiateLoadIfNeeded();
return result;
}
}
// no pending load_task_ and no items in the queue_
initiateLoadIfNeeded();
return std::nullopt;
}
bool FlowFileQueue::processLoadTaskWait(std::optional<std::chrono::milliseconds> timeout) {
if (!load_task_) {
return true;
}
std::future_status status = std::future_status::ready;
if (timeout) {
status = load_task_.value().items.wait_for(timeout.value());
}
if (status == std::future_status::timeout) {
logger_->log_debug("Load task is not yet completed");
return false;
}
gsl_Assert(status == std::future_status::ready);
logger_->log_debug("Getting loaded flow files");
size_t swapped_in_count = 0;
size_t intermediate_count = 0;
for (auto&& item : load_task_->items.get()) {
++swapped_in_count;
queue_.push(std::move(item));
}
for (auto&& intermediate_item : load_task_->intermediate_items) {
++intermediate_count;
queue_.push(std::move(intermediate_item));
}
load_task_.reset();
logger_->log_debug("Swapped in '{}' flow files and committed '{}' pending files", swapped_in_count, intermediate_count);
return true;
}
void FlowFileQueue::push(value_type element) {
// do not allow pushing elements in the past
element->setPenaltyExpiration(std::max(element->getPenaltyExpiration(), clock_->now()));
std::vector<value_type> flow_files_to_be_swapped_out;
if (load_task_) {
if (element->getPenaltyExpiration() <= load_task_->min) {
// flow file goes before load_task_
queue_.push(std::move(element));
} else if (load_task_->max <= element->getPenaltyExpiration()) {
// flow file goes after load_task_, i.e. immediately swapped out
flow_files_to_be_swapped_out.push_back(std::move(element));
} else {
// flow file belongs to the same range that is being swapped in
load_task_->intermediate_items.push_back(std::move(element));
}
} else if (!swapped_flow_files_.empty() && swapped_flow_files_.min().to_be_processed_after < element->getPenaltyExpiration()) {
// flow file goes into the swapped_flow_files_ set, i.e. immediately swapped out
flow_files_to_be_swapped_out.push_back(std::move(element));
} else {
queue_.push(std::move(element));
}
size_t flow_file_count = shouldSwapOutCount();
if (flow_file_count != 0) {
if (!load_task_) {
// we cannot initiate a queue_ swap while a load_task_ is pending
flow_files_to_be_swapped_out.reserve(flow_files_to_be_swapped_out.size() + flow_file_count);
for (size_t i = 0; i < flow_file_count; ++i) {
flow_files_to_be_swapped_out.push_back(queue_.popMax());
}
}
}
if (!flow_files_to_be_swapped_out.empty()) {
for (const auto& flow_file : flow_files_to_be_swapped_out) {
swapped_flow_files_.push(SwappedFlowFile{flow_file->getUUID(), flow_file->getPenaltyExpiration()});
}
logger_->log_debug("Initiating store of {} flow files", flow_files_to_be_swapped_out.size());
swap_manager_->store(std::move(flow_files_to_be_swapped_out));
}
}
bool FlowFileQueue::isWorkAvailable() const {
auto now = clock_->now();
if (!queue_.empty()) {
return queue_.min()->getPenaltyExpiration() <= now;
}
if (load_task_) {
if (load_task_->min > now) {
return false;
}
auto status = load_task_->items.wait_for(std::chrono::milliseconds{0});
return status == std::future_status::ready;
}
return !swapped_flow_files_.empty() && swapped_flow_files_.min().to_be_processed_after <= now;
}
bool FlowFileQueue::empty() const {
return size() == 0;
}
size_t FlowFileQueue::size() const {
return queue_.size() + (load_task_ ? load_task_->size() : 0) + swapped_flow_files_.size();
}
void FlowFileQueue::clear() {
queue_.clear();
load_task_.reset();
swapped_flow_files_.clear();
}
void FlowFileQueue::initiateLoadIfNeeded() {
if (load_task_) {
throw std::logic_error("There is already an active load task running");
}
size_t flow_files_count = shouldSwapInCount();
if (flow_files_count == 0) {
return;
}
logger_->log_debug("Initiating load of {} flow files", flow_files_count);
TimePoint min = TimePoint::max();
TimePoint max = TimePoint::min();
std::vector<SwappedFlowFile> flow_files;
flow_files.reserve(flow_files_count);
for (size_t i = 0; i < flow_files_count; ++i) {
SwappedFlowFile flow_file = swapped_flow_files_.popMin();
// TODO(adebreceni): since we are popping in order, we could elide these std::min and std::max comparisons
min = std::min(min, flow_file.to_be_processed_after);
max = std::max(max, flow_file.to_be_processed_after);
flow_files.push_back(flow_file);
}
load_task_ = {min, max, swap_manager_->load(std::move(flow_files)), flow_files_count};
}
void FlowFileQueue::setMinSize(size_t min_size) {
min_size_ = min_size;
}
void FlowFileQueue::setTargetSize(size_t target_size) {
target_size_ = target_size;
}
void FlowFileQueue::setMaxSize(size_t max_size) {
max_size_ = max_size;
}
size_t FlowFileQueue::shouldSwapOutCount() const {
if (!swap_manager_) {
return 0;
}
// read once for consistent view of a single atomic variable
size_t max_size = max_size_;
size_t target_size = target_size_;
if (max_size != 0 && target_size != 0
&& max_size < queue_.size() && target_size < queue_.size()) {
return queue_.size() - target_size;
}
return 0;
}
size_t FlowFileQueue::shouldSwapInCount() const {
if (!swap_manager_) {
return 0;
}
// read once for consistent view of a single atomic variable
size_t min_size = min_size_;
size_t target_size = target_size_;
if (min_size == 0 || target_size == 0) {
if (!swapped_flow_files_.empty()) {
logger_->log_info("Swapping in all the flow files");
return swapped_flow_files_.size();
}
return 0;
}
if (queue_.size() < min_size && queue_.size() < target_size) {
return std::min(target_size - queue_.size(), swapped_flow_files_.size());
}
return 0;
}
} // namespace org::apache::nifi::minifi::utils