blob: 063dc3a4f7f251c144cd889610a1344a5619fab5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_UTIL_INTERNAL_QUEUE_H
#define IMPALA_UTIL_INTERNAL_QUEUE_H
#include <boost/function.hpp>
#include <boost/thread/locks.hpp>
#include "util/fake-lock.h"
#include "util/spinlock.h"
namespace impala {
/// FIFO queue implemented as a doubly-linked lists with internal pointers. This is in
/// contrast to the STL list which allocates a wrapper Node object around the data. Since
/// it's an internal queue, the list pointers are maintained in the Nodes which is memory
/// owned by the user. The nodes cannot be deallocated while the queue has elements.
/// The internal structure is a doubly-linked list.
/// nullptr <-- N1 <--> N2 <--> N3 --> nullptr
/// (head) (tail)
///
/// InternalQueue<T> instantiates a thread-safe queue where the queue is protected by an
/// internal Spinlock. InternalList<T> instantiates a list with no thread safety.
///
/// To use these data structures, the element to be added to the queue or list must
/// subclass ::Node.
///
/// TODO: this is an ideal candidate to be made lock free.
/// T must be a subclass of InternalQueueBase::Node.
template <typename LockType, typename T>
class InternalQueueBase {
public:
struct Node {
public:
Node() : parent_queue(nullptr), next(nullptr), prev(nullptr) {}
virtual ~Node() {}
/// Returns true if the node is in a queue.
bool in_queue() const { return parent_queue != nullptr; }
/// Returns the Next/Prev node or nullptr if this is the end/front.
T* Next() const {
boost::lock_guard<LockType> lock(parent_queue->lock_);
return reinterpret_cast<T*>(next);
}
T* Prev() const {
boost::lock_guard<LockType> lock(parent_queue->lock_);
return reinterpret_cast<T*>(prev);
}
private:
friend class InternalQueueBase<LockType, T>;
/// Pointer to the queue this Node is on. nullptr if not on any queue.
InternalQueueBase<LockType, T>* parent_queue;
Node* next;
Node* prev;
};
InternalQueueBase() : head_(nullptr), tail_(nullptr), size_(0) {}
/// Returns the element at the head of the list without dequeuing or nullptr
/// if the queue is empty. This is O(1).
T* head() const {
boost::lock_guard<LockType> lock(lock_);
if (empty()) return nullptr;
return reinterpret_cast<T*>(head_);
}
/// Returns the element at the end of the list without dequeuing or nullptr
/// if the queue is empty. This is O(1).
T* tail() {
boost::lock_guard<LockType> lock(lock_);
if (empty()) return nullptr;
return reinterpret_cast<T*>(tail_);
}
/// Enqueue node onto the queue's tail. This is O(1).
void Enqueue(T* n) {
Node* node = (Node*)n;
DCHECK(node->next == nullptr);
DCHECK(node->prev == nullptr);
DCHECK(node->parent_queue == nullptr);
node->parent_queue = this;
{
boost::lock_guard<LockType> lock(lock_);
if (tail_ != nullptr) tail_->next = node;
node->prev = tail_;
tail_ = node;
if (head_ == nullptr) head_ = node;
++size_;
}
}
/// Pushes the node onto the queue's head. This is O(1).
void PushFront(T* n) {
Node* node = (Node*)n;
DCHECK(node->next == nullptr);
DCHECK(node->prev == nullptr);
DCHECK(node->parent_queue == nullptr);
node->parent_queue = this;
{
boost::lock_guard<LockType> lock(lock_);
if (head_ != nullptr) head_->prev = node;
node->next = head_;
head_ = node;
if (tail_ == nullptr) tail_ = node;
++size_;
}
}
/// Dequeues an element from the queue's head. Returns nullptr if the queue
/// is empty. This is O(1).
T* Dequeue() {
Node* result = nullptr;
{
boost::lock_guard<LockType> lock(lock_);
if (empty()) return nullptr;
--size_;
result = head_;
head_ = head_->next;
if (head_ == nullptr) {
tail_ = nullptr;
} else {
head_->prev = nullptr;
}
}
DCHECK(result != nullptr);
result->next = result->prev = nullptr;
result->parent_queue = nullptr;
return reinterpret_cast<T*>(result);
}
/// Dequeues an element from the queue's tail. Returns nullptr if the queue
/// is empty. This is O(1).
T* PopBack() {
Node* result = nullptr;
{
boost::lock_guard<LockType> lock(lock_);
if (empty()) return nullptr;
--size_;
result = tail_;
tail_ = tail_->prev;
if (tail_ == nullptr) {
head_ = nullptr;
} else {
tail_->next = nullptr;
}
}
DCHECK(result != nullptr);
result->next = result->prev = nullptr;
result->parent_queue = nullptr;
return reinterpret_cast<T*>(result);
}
/// Removes 'node' from the queue. This is O(1). No-op if node is
/// not on the list. Returns true if removed
bool Remove(T* n) {
Node* node = (Node*)n;
if (node->parent_queue != this) return false;
{
boost::lock_guard<LockType> lock(lock_);
if (node->next == nullptr && node->prev == nullptr) {
// Removing only node
DCHECK(node == head_);
DCHECK(tail_ == node);
head_ = tail_ = nullptr;
--size_;
node->parent_queue = nullptr;
return true;
}
if (head_ == node) {
DCHECK(node->prev == nullptr);
head_ = node->next;
} else {
DCHECK(node->prev != nullptr);
node->prev->next = node->next;
}
if (node == tail_) {
DCHECK(node->next == nullptr);
tail_ = node->prev;
} else if (node->next != nullptr) {
node->next->prev = node->prev;
}
--size_;
}
node->next = node->prev = nullptr;
node->parent_queue = nullptr;
return true;
}
/// Clears all elements in the list.
void Clear() {
boost::lock_guard<LockType> lock(lock_);
Node* cur = head_;
while (cur != nullptr) {
Node* tmp = cur;
cur = cur->next;
tmp->prev = tmp->next = nullptr;
tmp->parent_queue = nullptr;
}
size_ = 0;
head_ = tail_ = nullptr;
}
int size() const { return size_; }
bool empty() const { return head_ == nullptr; }
/// Returns if the target is on the queue. This is O(1) and does not acquire any locks.
bool Contains(const T* target) const {
return target->parent_queue == this;
}
/// Validates the internal structure of the list
bool Validate() {
int num_elements_found = 0;
boost::lock_guard<LockType> lock(lock_);
if (head_ == nullptr) {
if (tail_ != nullptr) return false;
if (size() != 0) return false;
return true;
}
if (head_->prev != nullptr) return false;
Node* current = head_;
while (current != nullptr) {
if (current->parent_queue != this) return false;
++num_elements_found;
Node* next = current->next;
if (next == nullptr) {
if (current != tail_) return false;
} else {
if (next->prev != current) return false;
}
current = next;
}
if (num_elements_found != size()) return false;
return true;
}
// Iterate over elements of queue, calling 'fn' for each element. If 'fn' returns
// false, terminate iteration. It is invalid to call other InternalQueue methods
// from 'fn'.
void Iterate(boost::function<bool(T*)> fn) {
boost::lock_guard<LockType> lock(lock_);
for (Node* current = head_; current != nullptr; current = current->next) {
if (!fn(reinterpret_cast<T*>(current))) return;
}
}
/// Prints the queue ptrs to a string.
std::string DebugString() {
std::stringstream ss;
ss << "(";
{
boost::lock_guard<LockType> lock(lock_);
Node* curr = head_;
while (curr != nullptr) {
ss << (void*)curr;
curr = curr->next;
}
}
ss << ")";
return ss.str();
}
private:
friend struct Node;
mutable LockType lock_;
Node *head_, *tail_;
int size_;
};
// The default LockType is SpinLock.
template <typename T>
class InternalQueue : public InternalQueueBase<SpinLock, T> {};
// InternalList is a non-threadsafe implementation.
template <typename T>
class InternalList : public InternalQueueBase<FakeLock, T> {};
}
#endif