blob: 69d90f3e1b16189e0b1a6f1981e8509c18b38465 [file]
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#ifndef __PROCESS_LOOP_HPP__
#define __PROCESS_LOOP_HPP__
#include <mutex>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
namespace process {
// Provides an asynchronous "loop" abstraction. This abstraction is
// helpful for code that would have synchronously been written as a
// loop but asynchronously ends up being a recursive set of functions
// which depending on the compiler may result in a stack overflow
// (i.e., a compiler that can't do sufficient tail call optimization
// may add stack frames for each recursive call).
//
// The loop abstraction takes an optional PID `pid` and uses it as the
// execution context to run the loop. The implementation does a
// `defer` on this `pid` to "pop" the stack when it needs to
// asynchronously recurse. This also lets callers synchronize
// execution with other code dispatching and deferring using `pid`. If
// `None` is passed for `pid` then no `defer` is done and the stack
// will still "pop" but be restarted from the execution context
// wherever the blocked future is completed. This is usually very safe
// when that blocked future will be completed by the IO thread, but
// should not be used if it's completed by another process (because
// you'll block that process until the next time the loop blocks).
//
// The two functions passed to the loop represent the loop "iterate"
// step and the loop "body" step respectively. Each invocation of
// "iterate" returns the next value and the "body" returns whether or
// not to continue looping (as well as any other processing necessary
// of course). You can think of this synchronously as:
//
// bool condition = true;
// do {
// condition = body(iterate());
// } while (condition);
//
// Asynchronously using recursion this might look like:
//
// Future<Nothing> loop()
// {
// return iterate()
// .then([](T t) {
// return body(t)
// .then([](bool condition) {
// if (condition) {
// return loop();
// } else {
// return Nothing();
// }
// });
// });
// }
//
// And asynchronously using `pid` as the execution context:
//
// Future<Nothing> loop()
// {
// return iterate()
// .then(defer(pid, [](T t) {
// return body(t)
// .then(defer(pid, [](bool condition) {
// if (condition) {
// return loop();
// } else {
// return Nothing();
// }
// }));
// }));
// }
//
// And now what this looks like using `loop`:
//
// loop(pid,
// []() {
// return iterate();
// },
// [](T t) {
// return body(t);
// });
//
// One difference between the `loop` version of the "body" versus the
// other non-loop examples above is the return value is not `bool` or
// `Future<bool>` but rather `ControlFlow<V>` or
// `Future<ControlFlow<V>>`. This enables you to return values out of
// the loop via a `Break(...)`, for example:
//
// loop(pid,
// []() {
// return iterate();
// },
// [](T t) {
// if (finished(t)) {
// return Break(SomeValue());
// }
// return Continue();
// });
template <typename Iterate,
typename Body,
typename T = typename internal::unwrap<typename result_of<Iterate()>::type>::type, // NOLINT(whitespace/line_length)
typename CF = typename internal::unwrap<typename result_of<Body(T)>::type>::type, // NOLINT(whitespace/line_length)
typename V = typename CF::ValueType>
Future<V> loop(const Option<UPID>& pid, Iterate&& iterate, Body&& body);
// A helper for `loop` which creates a Process for us to provide an
// execution context for running the loop.
template <typename Iterate,
typename Body,
typename T = typename internal::unwrap<typename result_of<Iterate()>::type>::type, // NOLINT(whitespace/line_length)
typename CF = typename internal::unwrap<typename result_of<Body(T)>::type>::type, // NOLINT(whitespace/line_length)
typename V = typename CF::ValueType>
Future<V> loop(Iterate&& iterate, Body&& body)
{
// Have libprocess own and free the new `ProcessBase`.
UPID process = spawn(new ProcessBase(), true);
return loop<Iterate, Body, T, CF, V>(
process,
std::forward<Iterate>(iterate),
std::forward<Body>(body))
.onAny([=]() {
terminate(process);
// NOTE: no need to `wait` or `delete` since the `spawn` above
// put libprocess in control of garbage collection.
});
}
// Generic "control flow" construct that is leveraged by
// implementations such as `loop`. At a high-level a `ControlFlow`
// represents some control flow statement such as `continue` or
// `break`, however, these statements can both have values or be
// value-less (i.e., these are meant to be composed "functionally" so
// the representation of `break` captures a value that "exits the
// current function" but the representation of `continue` does not).
//
// The pattern here is to define the type/representation of control
// flow statements within the `ControlFlow` class (e.g.,
// `ControlFlow::Continue` and `ControlFlow::Break`) but also provide
// "syntactic sugar" to make it easier to use at the call site (e.g.,
// the functions `Continue()` and `Break(...)`).
template <typename T>
class ControlFlow
{
public:
using ValueType = T;
enum class Statement
{
CONTINUE,
BREAK
};
class Continue
{
public:
Continue() = default;
template <typename U>
operator ControlFlow<U>() const
{
return ControlFlow<U>(ControlFlow<U>::Statement::CONTINUE, None());
}
};
class Break
{
public:
Break(T t) : t(std::move(t)) {}
template <typename U>
operator ControlFlow<U>() const &
{
return ControlFlow<U>(ControlFlow<U>::Statement::BREAK, t);
}
template <typename U>
operator ControlFlow<U>() &&
{
return ControlFlow<U>(ControlFlow<U>::Statement::BREAK, std::move(t));
}
private:
T t;
};
ControlFlow(Statement s, Option<T> t) : s(s), t(std::move(t)) {}
Statement statement() const { return s; }
T& value() & { return t.get(); }
const T& value() const & { return t.get(); }
T&& value() && { return t.get(); }
const T&& value() const && { return t.get(); }
private:
Statement s;
Option<T> t;
};
// Provides "syntactic sugar" for creating a `ControlFlow::Continue`.
struct Continue
{
Continue() = default;
template <typename T>
operator ControlFlow<T>() const
{
return typename ControlFlow<T>::Continue();
}
};
// Provides "syntactic sugar" for creating a `ControlFlow::Break`.
template <typename T>
typename ControlFlow<typename std::decay<T>::type>::Break Break(T&& t)
{
return typename ControlFlow<typename std::decay<T>::type>::Break(
std::forward<T>(t));
}
inline ControlFlow<Nothing>::Break Break()
{
return ControlFlow<Nothing>::Break(Nothing());
}
namespace internal {
template <typename Iterate, typename Body, typename T, typename R>
class Loop : public std::enable_shared_from_this<Loop<Iterate, Body, T, R>>
{
public:
template <typename Iterate_, typename Body_>
static std::shared_ptr<Loop> create(
const Option<UPID>& pid,
Iterate_&& iterate,
Body_&& body)
{
return std::shared_ptr<Loop>(
new Loop(
pid,
std::forward<Iterate_>(iterate),
std::forward<Body_>(body)));
}
std::shared_ptr<Loop> shared()
{
// Must fully specify `shared_from_this` because we're templated.
return std::enable_shared_from_this<Loop>::shared_from_this();
}
std::weak_ptr<Loop> weak()
{
return std::weak_ptr<Loop>(shared());
}
Future<R> start()
{
auto self = shared();
auto weak_self = weak();
// Propagating discards:
//
// When the caller does a discard we need to propagate it to
// either the future returned from `iterate` or the future
// returned from `body`. One easy way to do this would be to add
// an `onAny` callback for every future returned from `iterate`
// and `body`, but that would be a slow memory leak that would
// grow over time, especially if the loop was actually
// infinite. Instead, we capture the current future that needs to
// be discarded within a `discard` function that we'll invoke when
// we get a discard. Because there is a race setting the `discard`
// function and reading it out to invoke we have to synchronize
// access using a mutex. An alternative strategy would be to use
// something like `atomic_load` and `atomic_store` with
// `shared_ptr` so that we can swap the current future(s)
// atomically.
promise.future().onDiscard([weak_self]() {
auto self = weak_self.lock();
if (self) {
// We need to make a copy of the current `discard` function so
// that we can invoke it outside of the `synchronized` block
// in the event that discarding invokes causes the `onAny`
// callbacks that we have added in `run` to execute which may
// deadlock attempting to re-acquire `mutex`!
std::function<void()> f = []() {};
synchronized (self->mutex) {
f = self->discard;
}
f();
}
});
if (pid.isSome()) {
// Start the loop using `pid` as the execution context.
dispatch(pid.get(), [self]() {
self->run(self->iterate());
});
// TODO(benh): Link with `pid` so that we can discard or abandon
// the promise in the event `pid` terminates and didn't discard
// us so that we can avoid any leaks (memory or otherwise).
} else {
run(iterate());
}
return promise.future();
}
void run(Future<T> next)
{
auto self = shared();
// Reset `discard` so that we're not delaying cleanup of any
// captured futures longer than necessary.
//
// TODO(benh): Use `WeakFuture` in `discard` functions instead.
synchronized (mutex) {
discard = []() {};
}
while (next.isReady()) {
Future<ControlFlow<R>> flow = body(next.get());
if (flow.isReady()) {
switch (flow->statement()) {
case ControlFlow<R>::Statement::CONTINUE: {
next = iterate();
continue;
}
case ControlFlow<R>::Statement::BREAK: {
promise.set(flow->value());
return;
}
}
} else {
auto continuation = [self](const Future<ControlFlow<R>>& flow) {
if (flow.isReady()) {
switch (flow->statement()) {
case ControlFlow<R>::Statement::CONTINUE: {
self->run(self->iterate());
break;
}
case ControlFlow<R>::Statement::BREAK: {
self->promise.set(flow->value());
break;
}
}
} else if (flow.isFailed()) {
self->promise.fail(flow.failure());
} else if (flow.isDiscarded()) {
self->promise.discard();
}
};
if (pid.isSome()) {
flow.onAny(defer(pid.get(), continuation));
} else {
flow.onAny(continuation);
}
if (!promise.future().hasDiscard()) {
synchronized (mutex) {
self->discard = [=]() mutable { flow.discard(); };
}
}
// There's a race between when a discard occurs and the
// `discard` function gets invoked and therefore we must
// explicitly always do a discard. In addition, after a
// discard occurs we'll need to explicitly do discards for
// each new future that blocks.
if (promise.future().hasDiscard()) {
flow.discard();
}
return;
}
}
auto continuation = [self](const Future<T>& next) {
if (next.isReady()) {
self->run(next);
} else if (next.isFailed()) {
self->promise.fail(next.failure());
} else if (next.isDiscarded()) {
self->promise.discard();
}
};
if (pid.isSome()) {
next.onAny(defer(pid.get(), continuation));
} else {
next.onAny(continuation);
}
if (!promise.future().hasDiscard()) {
synchronized (mutex) {
discard = [=]() mutable { next.discard(); };
}
}
// See comment above as to why we need to explicitly discard
// regardless of the path the if statement took above.
if (promise.future().hasDiscard()) {
next.discard();
}
}
protected:
Loop(const Option<UPID>& pid, const Iterate& iterate, const Body& body)
: pid(pid), iterate(iterate), body(body) {}
Loop(const Option<UPID>& pid, Iterate&& iterate, Body&& body)
: pid(pid), iterate(std::move(iterate)), body(std::move(body)) {}
private:
const Option<UPID> pid;
Iterate iterate;
Body body;
Promise<R> promise;
// In order to discard the loop safely we capture the future that
// needs to be discarded within the `discard` function and reading
// and writing that function with a mutex.
std::mutex mutex;
std::function<void()> discard = []() {};
};
} // namespace internal {
template <typename Iterate, typename Body, typename T, typename CF, typename V>
Future<V> loop(const Option<UPID>& pid, Iterate&& iterate, Body&& body)
{
using Loop = internal::Loop<
typename std::decay<Iterate>::type,
typename std::decay<Body>::type,
T,
V>;
std::shared_ptr<Loop> loop = Loop::create(
pid,
std::forward<Iterate>(iterate),
std::forward<Body>(body));
return loop->start();
}
} // namespace process {
#endif // __PROCESS_LOOP_HPP__