blob: 3477c6c7d0f6259faff9d00d79e89c544962989f [file] [log] [blame]
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#ifndef __PROCESS_RUN_QUEUE_HPP__
#define __PROCESS_RUN_QUEUE_HPP__
// At _configuration_ (i.e., build) time you can specify a few
// optimizations:
//
// (1) --enable-lock-free-run-queue (autotools) or
// -DENABLE_LOCK_FREE_RUN_QUEUE (cmake) which enables the
// lock-free run queue implementation (see below for more details).
//
// (2) --enable-last-in-first-out-fixed-size-semaphore (autotools) or
// -DENABLE_LAST_IN_FIRST_OUT_FIXED_SIZE_SEMAPHORE (cmake) which
// enables an optimized semaphore implementation (see semaphore.hpp
// for more details).
//
// By default we use the `LockingRunQueue` and
// `DecomissionableKernelSemaphore`.
//
// We choose to make these _compile-time_ decisions rather than
// _runtime_ decisions because we wanted the run queue implementation
// to be compile-time optimized (e.g., inlined, etc).
#ifdef LOCK_FREE_RUN_QUEUE
#include <concurrentqueue.h>
#endif // LOCK_FREE_RUN_QUEUE
#include <algorithm>
#include <list>
#include <process/process.hpp>
#include <stout/synchronized.hpp>
#include "semaphore.hpp"
namespace process {
#ifndef LOCK_FREE_RUN_QUEUE
class RunQueue
{
public:
bool extract(ProcessBase* process)
{
synchronized (mutex) {
std::list<ProcessBase*>::iterator it = std::find(
processes.begin(),
processes.end(),
process);
if (it != processes.end()) {
processes.erase(it);
return true;
}
}
return false;
}
void wait()
{
semaphore.wait();
}
void enqueue(ProcessBase* process)
{
synchronized (mutex) {
processes.push_back(process);
}
epoch.fetch_add(1);
semaphore.signal();
}
// Precondition: `wait` must get called before `dequeue`!
ProcessBase* dequeue()
{
synchronized (mutex) {
if (!processes.empty()) {
ProcessBase* process = processes.front();
processes.pop_front();
return process;
}
}
return nullptr;
}
// NOTE: this function can't be const because `synchronized (mutex)`
// is not const ...
bool empty()
{
synchronized (mutex) {
return processes.empty();
}
}
void decomission()
{
semaphore.decomission();
}
size_t capacity() const
{
return semaphore.capacity();
}
// Epoch used to capture changes to the run queue when settling.
std::atomic_long epoch = ATOMIC_VAR_INIT(0L);
private:
std::list<ProcessBase*> processes;
std::mutex mutex;
// Semaphore used for threads to wait.
#ifndef LAST_IN_FIRST_OUT_FIXED_SIZE_SEMAPHORE
DecomissionableKernelSemaphore semaphore;
#else
DecomissionableLastInFirstOutFixedSizeSemaphore semaphore;
#endif // LAST_IN_FIRST_OUT_FIXED_SIZE_SEMAPHORE
};
#else // LOCK_FREE_RUN_QUEUE
class RunQueue
{
public:
bool extract(ProcessBase*)
{
// NOTE: moodycamel::ConcurrentQueue does not provide a way to
// implement extract so we simply return false here.
return false;
}
void wait()
{
semaphore.wait();
}
void enqueue(ProcessBase* process)
{
queue.enqueue(process);
epoch.fetch_add(1);
semaphore.signal();
}
// Precondition: `wait` must get called before `dequeue`!
ProcessBase* dequeue()
{
// NOTE: we loop _forever_ until we actually dequeue a process
// because the contract for using the run queue is that `wait`
// must be called first so we know that there is something to be
// dequeued or the run queue has been decommissioned and we should
// just return `nullptr`.
ProcessBase* process = nullptr;
while (!queue.try_dequeue(process)) {
if (semaphore.decomissioned()) {
break;
}
}
return process;
}
bool empty() const
{
return queue.size_approx() == 0;
}
void decomission()
{
semaphore.decomission();
}
size_t capacity() const
{
return semaphore.capacity();
}
// Epoch used to capture changes to the run queue when settling.
std::atomic_long epoch = ATOMIC_VAR_INIT(0L);
private:
moodycamel::ConcurrentQueue<ProcessBase*> queue;
#ifndef LAST_IN_FIRST_OUT_FIXED_SIZE_SEMAPHORE
DecomissionableKernelSemaphore semaphore;
#else
DecomissionableLastInFirstOutFixedSizeSemaphore semaphore;
#endif // LAST_IN_FIRST_OUT_FIXED_SIZE_SEMAPHORE
};
#endif // LOCK_FREE_RUN_QUEUE
} // namespace process {
#endif // __PROCESS_RUN_QUEUE_HPP__