blob: 7331c12159c3b307c109afaa4fc1e915610d9ad1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef KUDU_UTIL_BLOCKING_QUEUE_H
#define KUDU_UTIL_BLOCKING_QUEUE_H
#include <list>
#include <string>
#include <type_traits>
#include <unistd.h>
#include <vector>
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/util/condition_variable.h"
#include "kudu/util/monotime.h"
#include "kudu/util/mutex.h"
#include "kudu/util/status.h"
namespace kudu {
// Return values for BlockingQueue::Put()
enum QueueStatus {
QUEUE_SUCCESS = 0,
QUEUE_SHUTDOWN = 1,
QUEUE_FULL = 2
};
// Default logical length implementation: always returns 1.
struct DefaultLogicalSize {
template<typename T>
static size_t logical_size(const T& /* unused */) {
return 1;
}
};
template <typename T, class LOGICAL_SIZE = DefaultLogicalSize>
class BlockingQueue {
public:
// If T is a pointer, this will be the base type. If T is not a pointer, you
// can ignore this and the functions which make use of it.
// Template substitution failure is not an error.
typedef typename std::remove_pointer<T>::type T_VAL;
explicit BlockingQueue(size_t max_size)
: shutdown_(false),
size_(0),
max_size_(max_size),
not_empty_(&lock_),
not_full_(&lock_) {
}
// If the queue holds a bare pointer, it must be empty on destruction, since
// it may have ownership of the pointer.
~BlockingQueue() {
DCHECK(list_.empty() || !std::is_pointer<T>::value)
<< "BlockingQueue holds bare pointers at destruction time";
}
// Get an element from the queue. Returns false if we were shut down prior to
// getting the element.
bool BlockingGet(T *out) {
MutexLock l(lock_);
while (true) {
if (!list_.empty()) {
*out = list_.front();
list_.pop_front();
decrement_size_unlocked(*out);
not_full_.Signal();
return true;
}
if (shutdown_) {
return false;
}
not_empty_.Wait();
}
}
// Get an element from the queue. Returns false if the queue is empty and
// we were shut down prior to getting the element.
bool BlockingGet(gscoped_ptr<T_VAL> *out) {
T t = NULL;
bool got_element = BlockingGet(&t);
if (!got_element) {
return false;
}
out->reset(t);
return true;
}
// Get all elements from the queue and append them to a vector.
//
// If 'deadline' passes and no elements have been returned from the
// queue, returns Status::TimedOut(). If 'deadline' is uninitialized,
// no deadline is used.
//
// If the queue has been shut down, but there are still elements waiting,
// then it returns those elements as if the queue were not yet shut down.
//
// Returns:
// - OK if successful
// - TimedOut if the deadline passed
// - Aborted if the queue shut down
Status BlockingDrainTo(std::vector<T>* out, MonoTime deadline = MonoTime()) {
MutexLock l(lock_);
while (true) {
if (!list_.empty()) {
out->reserve(list_.size());
for (const T& elt : list_) {
out->push_back(elt);
decrement_size_unlocked(elt);
}
list_.clear();
not_full_.Signal();
return Status::OK();
}
if (PREDICT_FALSE(shutdown_)) {
return Status::Aborted("");
}
if (!deadline.Initialized()) {
not_empty_.Wait();
} else if (PREDICT_FALSE(!not_empty_.WaitUntil(deadline))) {
return Status::TimedOut("");
}
}
}
// Attempts to put the given value in the queue.
// Returns:
// QUEUE_SUCCESS: if successfully inserted
// QUEUE_FULL: if the queue has reached max_size
// QUEUE_SHUTDOWN: if someone has already called Shutdown()
QueueStatus Put(const T &val) {
MutexLock l(lock_);
if (size_ >= max_size_) {
return QUEUE_FULL;
}
if (shutdown_) {
return QUEUE_SHUTDOWN;
}
list_.push_back(val);
increment_size_unlocked(val);
l.Unlock();
not_empty_.Signal();
return QUEUE_SUCCESS;
}
// Returns the same as the other Put() overload above.
// If the element was inserted, the gscoped_ptr releases its contents.
QueueStatus Put(gscoped_ptr<T_VAL> *val) {
QueueStatus s = Put(val->get());
if (s == QUEUE_SUCCESS) {
ignore_result<>(val->release());
}
return s;
}
// Gets an element for the queue; if the queue is full, blocks until
// space becomes available. Returns false if we were shutdown prior
// to enqueueing the element.
bool BlockingPut(const T& val) {
MutexLock l(lock_);
while (true) {
if (shutdown_) {
return false;
}
if (size_ < max_size_) {
list_.push_back(val);
increment_size_unlocked(val);
l.Unlock();
not_empty_.Signal();
return true;
}
not_full_.Wait();
}
}
// Same as other BlockingPut() overload above. If the element was
// enqueued, gscoped_ptr releases its contents.
bool BlockingPut(gscoped_ptr<T_VAL>* val) {
bool ret = Put(val->get());
if (ret) {
ignore_result(val->release());
}
return ret;
}
// Shut down the queue.
// When a blocking queue is shut down, no more elements can be added to it,
// and Put() will return QUEUE_SHUTDOWN.
// Existing elements will drain out of it, and then BlockingGet will start
// returning false.
void Shutdown() {
MutexLock l(lock_);
shutdown_ = true;
not_full_.Broadcast();
not_empty_.Broadcast();
}
bool empty() const {
MutexLock l(lock_);
return list_.empty();
}
size_t max_size() const {
return max_size_;
}
std::string ToString() const {
std::string ret;
MutexLock l(lock_);
for (const T& t : list_) {
ret.append(t->ToString());
ret.append("\n");
}
return ret;
}
private:
// Increments queue size. Must be called when 'lock_' is held.
void increment_size_unlocked(const T& t) {
size_ += LOGICAL_SIZE::logical_size(t);
}
// Decrements queue size. Must be called when 'lock_' is held.
void decrement_size_unlocked(const T& t) {
size_ -= LOGICAL_SIZE::logical_size(t);
}
bool shutdown_;
size_t size_;
size_t max_size_;
mutable Mutex lock_;
ConditionVariable not_empty_;
ConditionVariable not_full_;
std::list<T> list_;
};
} // namespace kudu
#endif