blob: ae79c850882cae859b30431f035d421912893a32 [file] [log] [blame]
/************************************************************
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*************************************************************/
#ifndef SINGA_UTILS_SAFE_QUEUE_H_
#define SINGA_UTILS_SAFE_QUEUE_H_
#include <algorithm>
#include <condition_variable>
#include <list>
#include <mutex>
#include <queue>
#include <thread>
/**
* Thread-safe queue.
*/
template <typename T, class Container = std::queue<T>>
class SafeQueue {
public:
SafeQueue() = default;
~SafeQueue() { std::lock_guard<std::mutex> lock(mutex_); }
/**
* Push an element into the queue. Blocking operation.
* @return true if success;
*/
bool Push(const T& e) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(e);
condition_.notify_one();
return true;
}
/**
* Pop an element from the queue.
* It will be blocked until one element is poped.
*/
void Pop(T& e) {
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this]() { return !queue_.empty(); });
e = queue_.front();
queue_.pop();
}
/**
* Pop an item from the queue until one element is poped or timout.
* @param[in] timeout, return false after waiting this number of microseconds
*/
bool Pop(T& item, std::uint64_t timeout) {
std::unique_lock<std::mutex> lock(mutex_);
if (queue_.empty()) {
if (timeout == 0) return false;
if (condition_.wait_for(lock, std::chrono::microseconds(timeout)) ==
std::cv_status::timeout)
return false;
}
item = queue_.front();
queue_.pop();
return true;
}
/**
* Try to pop an element from the queue.
* \return false the queue is empty now.
*/
bool TryPop(T& e) {
std::unique_lock<std::mutex> lock(mutex_);
if (queue_.empty()) return false;
e = queue_.front();
queue_.pop();
return true;
}
/**
* @return Number of elements in the queue.
*/
unsigned int Size() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.size();
}
private:
Container queue_;
mutable std::mutex mutex_;
std::condition_variable condition_;
};
/**
* Thread safe priority queue.
*/
template <typename T>
class PriorityQueue {
public:
PriorityQueue() = default;
/**
* Push an element into the queue with a given priority.
* The queue should not be a priority queue.
* @return true if success; otherwise false, e.g., due to capacity constraint.
*/
bool Push(const T& e, int priority) {
Element ele;
ele.data = e;
ele.priority = priority;
queue_.push(ele);
return true;
}
/**
* Pop an element from the queue with the highest priority.
* It blocks until one element is poped.
*/
void Pop(T& e) {
Element ele;
queue_.pop(ele);
e = ele.data;
}
/**
* Pop the item with the highest priority from the queue until one element is
* poped or timeout.
* @param[in] timeout, return false if no element is poped after this number
* of microseconds.
*/
bool Pop(T& e, std::uint64_t timeout) {
Element ele;
if (queue_.pop(ele, timeout)) {
e = ele.data;
return true;
} else {
return false;
}
}
/**
* Try to pop an element from the queue.
* @return false if the queue is empty now.
*/
bool TryPop(T& e) {
Element ele;
if (queue_.TryPop(ele)) {
e = ele.data;
return true;
} else {
return false;
}
}
/**
* @return Number of elements in the queue.
*/
unsigned int Size() const { return queue_.Size(); }
private:
struct Element {
T data;
int priority;
inline bool operator<(const Element& other) const {
return priority < other.priority;
}
};
SafeQueue<Element, std::priority_queue<Element>> queue_;
};
#endif // SINGA_UTILS_SAFE_QUEUE_H_