blob: c364b7dc831050bc1b23f669dc1ccc1f1255a9c8 [file]
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#ifndef __PROCESS_QUEUE_HPP__
#define __PROCESS_QUEUE_HPP__
#include <atomic>
#include <deque>
#include <memory>
#include <queue>
#include <process/future.hpp>
#include <process/owned.hpp>
#include <stout/synchronized.hpp>
namespace process {
template <typename T>
class Queue
{
public:
Queue() : data(new Data()) {}
// TODO(bmahler): Take a T&& here instead.
void put(const T& t)
{
// NOTE: We need to grab the promise 'date->promises.front()' but
// set it outside of the critical section because setting it might
// trigger callbacks that try to reacquire the lock.
Owned<Promise<T>> promise;
synchronized (data->lock) {
if (data->promises.empty()) {
data->elements.push(t);
} else {
promise = data->promises.front();
data->promises.pop_front();
}
}
if (promise.get() != nullptr) {
promise->set(t);
}
}
Future<T> get()
{
Future<T> future;
synchronized (data->lock) {
if (data->elements.empty()) {
data->promises.emplace_back(new Promise<T>());
future = data->promises.back()->future();
} else {
T t = std::move(data->elements.front());
data->elements.pop();
return Future<T>(std::move(t));
}
}
// If there were no items available, we set up a discard
// handler. This is done here to minimize the amount of
// work done within the critical section above.
auto weak_data = std::weak_ptr<Data>(data);
future.onDiscard([weak_data, future]() {
auto data = weak_data.lock();
if (!data) {
return;
}
synchronized (data->lock) {
for (auto it = data->promises.begin();
it != data->promises.end();
++it) {
if ((*it)->future() == future) {
(*it)->discard();
data->promises.erase(it);
break;
}
}
}
});
return future;
}
size_t size() const
{
synchronized (data->lock) {
return data->elements.size();
}
}
private:
struct Data
{
Data() = default;
~Data()
{
// TODO(benh): Fail promises?
}
// Rather than use a process to serialize access to the queue's
// internal data we use a 'std::atomic_flag'.
std::atomic_flag lock = ATOMIC_FLAG_INIT;
// Represents "waiters" for elements from the queue.
std::deque<Owned<Promise<T>>> promises;
// Represents elements already put in the queue.
std::queue<T> elements;
};
std::shared_ptr<Data> data;
};
} // namespace process {
#endif // __PROCESS_QUEUE_HPP__