blob: 7d071d9a6b6a541d612e8f66fd87ca2a77bff835 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
////////////////////////////////////////////////////////////////////
//
// This file consists of class definition of a producer consumer queue
//
/////////////////////////////////////////////////////////////////////
#if !defined(__PC_QUEUE_H)
#define __PC_QUEUE_H
#include <queue>
#include <vector>
#include <mutex>
#include <condition_variable>
#include "basics/sptypes.h"
template <typename T>
class PCQueue {
public:
PCQueue() {}
virtual ~PCQueue() throw() {}
PCQueue(const PCQueue& pcqueue) = delete;
PCQueue& operator=(const PCQueue& pcqueue) = delete;
void enqueue(T _item);
void enqueue_all(T _item, sp_int32 _ntimes);
T dequeue(void);
T trydequeue(bool&);
sp_uint32 trydequeuen(sp_uint32 _n, std::vector<T>& _queue);
sp_int32 size(void);
protected:
std::queue<T> queue_;
std::condition_variable cond_;
std::mutex mutex_;
};
template <typename T>
void PCQueue<T>::enqueue(T _item) {
std::unique_lock<std::mutex> m(mutex_);
queue_.push(_item);
cond_.notify_one();
}
template <typename T>
void PCQueue<T>::enqueue_all(T _item, sp_int32 _ntimes) {
std::unique_lock<std::mutex> m(mutex_);
for (sp_int32 i = 0; i < _ntimes; i++) {
queue_.push(_item);
}
cond_.notify_one();
}
template <typename T>
T PCQueue<T>::dequeue() {
std::unique_lock<std::mutex> m(mutex_);
while (queue_.empty()) cond_.wait(m);
T item = queue_.front();
queue_.pop();
return item;
}
template <typename T>
T PCQueue<T>::trydequeue(bool& _dequeued) {
std::unique_lock<std::mutex> m(mutex_);
if (queue_.empty()) {
_dequeued = false;
return NULL;
}
T item = queue_.front();
queue_.pop();
_dequeued = true;
return item;
}
template <typename T>
sp_uint32 PCQueue<T>::trydequeuen(sp_uint32 _ntodequeue, std::vector<T>& _retval) {
std::unique_lock<std::mutex> m(mutex_);
sp_uint32 dequeued = 0;
while (!queue_.empty() && dequeued < _ntodequeue) {
T item = queue_.front();
queue_.pop();
_retval.push_back(item);
dequeued++;
}
return dequeued;
}
template <typename T>
sp_int32 PCQueue<T>::size(void) {
std::unique_lock<std::mutex> m(mutex_);
sp_int32 retval = queue_.size();
return retval;
}
#endif