blob: 393be4a294fe6916e5fbb44efe3e9fbd93650d5d [file] [log] [blame]
/*
* Copyright 2015 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
////////////////////////////////////////////////////////////////////
//
// This file consists of ciass definition of a producer consumer queue
//
/////////////////////////////////////////////////////////////////////
#if !defined(__PC_QUEUE_H)
#define __PC_QUEUE_H
#include <queue>
#include <vector>
#include <mutex>
#include <condition_variable>
#include "basics/sptypes.h"
template <typename T>
class PCQueue {
public:
PCQueue() {}
virtual ~PCQueue() throw() {}
PCQueue(const PCQueue& pcqueue) = delete;
PCQueue& operator=(const PCQueue& pcqueue) = delete;
void enqueue(T _item);
void enqueue_all(T _item, sp_int32 _ntimes);
T dequeue(void);
T trydequeue(bool&);
sp_uint32 trydequeuen(sp_uint32 _n, std::vector<T>& _queue);
sp_int32 size(void);
protected:
std::queue<T> queue_;
std::condition_variable cond_;
std::mutex mutex_;
};
template <typename T>
void PCQueue<T>::enqueue(T _item) {
std::unique_lock<std::mutex> m(mutex_);
queue_.push(_item);
cond_.notify_one();
}
template <typename T>
void PCQueue<T>::enqueue_all(T _item, sp_int32 _ntimes) {
std::unique_lock<std::mutex> m(mutex_);
for (sp_int32 i = 0; i < _ntimes; i++) {
queue_.push(_item);
}
cond_.notify_one();
}
template <typename T>
T PCQueue<T>::dequeue() {
std::unique_lock<std::mutex> m(mutex_);
while (queue_.empty()) cond_.wait(m);
T item = queue_.front();
queue_.pop();
return item;
}
template <typename T>
T PCQueue<T>::trydequeue(bool& _dequeued) {
std::unique_lock<std::mutex> m(mutex_);
if (queue_.empty()) {
_dequeued = false;
return NULL;
}
T item = queue_.front();
queue_.pop();
_dequeued = true;
return item;
}
template <typename T>
sp_uint32 PCQueue<T>::trydequeuen(sp_uint32 _ntodequeue, std::vector<T>& _retval) {
std::unique_lock<std::mutex> m(mutex_);
sp_uint32 dequeued = 0;
while (!queue_.empty() && dequeued < _ntodequeue) {
T item = queue_.front();
queue_.pop();
_retval.push_back(item);
dequeued++;
}
return dequeued;
}
template <typename T>
sp_int32 PCQueue<T>::size(void) {
std::unique_lock<std::mutex> m(mutex_);
sp_int32 retval = queue_.size();
return retval;
}
#endif