blob: 559164a20cd1ecf643b190fbc2127d5a315c4cb6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBMINIFI_INCLUDE_UTILS_MINIFICONCURRENTQUEUE_H_
#define LIBMINIFI_INCLUDE_UTILS_MINIFICONCURRENTQUEUE_H_
#include <chrono>
#include <deque>
#include <mutex>
#include <condition_variable>
#include <utility>
#include <stdexcept>
#include "utils/TryMoveCall.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace utils {
// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers.
// Guarantees elements to be dequeued in order of insertion
template <typename T>
class ConcurrentQueue {
public:
ConcurrentQueue() = default;
ConcurrentQueue(const ConcurrentQueue& other) = delete;
ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete;
ConcurrentQueue(ConcurrentQueue&& other)
: ConcurrentQueue(std::move(other), std::lock_guard<std::mutex>(other.mutex_)) {}
ConcurrentQueue& operator=(ConcurrentQueue&& other) {
if (this != &other) {
std::lock(mtx_, other.mtx_);
std::lock_guard<std::mutex> lk1(mtx_, std::adopt_lock);
std::lock_guard<std::mutex> lk2(other.mtx_, std::adopt_lock);
queue_.swap(other.queue_);
}
return *this;
}
bool tryDequeue(T& out) {
std::unique_lock<std::mutex> lck(mtx_);
return tryDequeueImpl(lck, out);
}
template<typename Functor>
bool consume(Functor&& fun) {
std::unique_lock<std::mutex> lck(mtx_);
return consumeImpl(std::move(lck), std::forward<Functor>(fun));
}
bool empty() const {
std::unique_lock<std::mutex> lck(mtx_);
return emptyImpl(lck);
}
size_t size() const {
std::lock_guard<std::mutex> guard(mtx_);
return queue_.size();
}
void clear() {
std::lock_guard<std::mutex> guard(mtx_);
queue_.clear();
}
template <typename... Args>
void enqueue(Args&&... args) {
std::lock_guard<std::mutex> guard(mtx_);
queue_.emplace_back(std::forward<Args>(args)...);
}
private:
ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard<std::mutex>&)
: queue_(std::move(other.queue_)) {}
protected:
void checkLock(std::unique_lock<std::mutex>& lck) const {
if (!lck.owns_lock()) {
throw std::logic_error("Caller of protected functions of ConcurrentQueue should own the lock!");
}
}
// Warning: this function copies if T is not nothrow move constructible
bool tryDequeueImpl(std::unique_lock<std::mutex>& lck, T& out) {
checkLock(lck);
if (queue_.empty()) {
return false;
}
out = std::move_if_noexcept(queue_.front());
queue_.pop_front();
return true;
}
// Warning: this function copies if T is not nothrow move constructible
template<typename Functor>
bool consumeImpl(std::unique_lock<std::mutex>&& lock_to_adopt, Functor&& fun) {
std::unique_lock<std::mutex> lock(std::move(lock_to_adopt));
checkLock(lock);
if (queue_.empty()) {
return false;
}
T elem = std::move_if_noexcept(queue_.front());
queue_.pop_front();
lock.unlock();
TryMoveCall<Functor, T>::call(std::forward<Functor>(fun), elem);
return true;
}
bool emptyImpl(std::unique_lock<std::mutex>& lck) const {
checkLock(lck);
return queue_.empty();
}
mutable std::mutex mtx_;
private:
std::deque<T> queue_;
};
// A ConcurrentQueue extended with a condition variable to be able to block and wait for incoming data
// Stopping interrupts all consumers without a chance to consume remaining elements in the queue although elements can still be enqueued
// Started means queued elements can be consumed/dequeued and dequeueWait* calls can block
template <typename T>
class ConditionConcurrentQueue : private ConcurrentQueue<T> {
public:
explicit ConditionConcurrentQueue(bool start = true) : ConcurrentQueue<T>{}, running_{start} {}
ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete;
ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete;
ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete;
ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete;
using ConcurrentQueue<T>::size;
using ConcurrentQueue<T>::empty;
using ConcurrentQueue<T>::clear;
template <typename... Args>
void enqueue(Args&&... args) {
ConcurrentQueue<T>::enqueue(std::forward<Args>(args)...);
if (running_) {
cv_.notify_one();
}
}
bool dequeueWait(T& out) {
std::unique_lock<std::mutex> lck(this->mtx_);
cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped
return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
}
template<typename Functor>
bool consumeWait(Functor&& fun) {
std::unique_lock<std::mutex> lck(this->mtx_);
cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped
return running_ && ConcurrentQueue<T>::consumeImpl(std::move(lck), std::forward<Functor>(fun));
}
template< class Rep, class Period >
bool dequeueWaitFor(T& out, const std::chrono::duration<Rep, Period>& time) {
std::unique_lock<std::mutex> lck(this->mtx_);
cv_.wait_for(lck, time, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Wake up with timeout or in case there is something to do
return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
}
bool dequeueWaitUntil(T& out, const std::chrono::system_clock::time_point& time) {
std::unique_lock<std::mutex> lck(this->mtx_);
cv_.wait_until(lck, time, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Wake up with timeout or in case there is something to do
return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
}
template<typename Functor, class Rep, class Period>
bool consumeWaitFor(Functor&& fun, const std::chrono::duration<Rep, Period>& time) {
std::unique_lock<std::mutex> lck(this->mtx_);
cv_.wait_for(lck, time, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Wake up with timeout or in case there is something to do
return running_ && ConcurrentQueue<T>::consumeImpl(std::move(lck), std::forward<Functor>(fun));
}
template<typename Functor>
bool consumeWaitUntil(Functor&& fun, const std::chrono::system_clock::time_point& time) {
std::unique_lock<std::mutex> lck(this->mtx_);
cv_.wait_until(lck, time, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Wake up with timeout or in case there is something to do
return running_ && ConcurrentQueue<T>::consumeImpl(std::move(lck), std::forward<Functor>(fun));
}
bool tryDequeue(T& out) {
std::unique_lock<std::mutex> lck(this->mtx_);
return running_ && ConcurrentQueue<T>::tryDequeueImpl(lck, out);
}
void stop() {
std::lock_guard<std::mutex> guard(this->mtx_);
running_ = false;
cv_.notify_all();
}
void start() {
std::unique_lock<std::mutex> lck(this->mtx_);
running_ = true;
}
bool isRunning() const {
std::lock_guard<std::mutex> guard(this->mtx_);
return running_; // In case it's not running no notifications are generated, dequeueing fails instead of blocking to avoid hanging threads
}
private:
bool running_;
std::condition_variable cv_;
};
} // namespace utils
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org
#endif // LIBMINIFI_INCLUDE_UTILS_MINIFICONCURRENTQUEUE_H_