blob: 4281d31c3eae743c6f46798f128628115160499d [file]
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#ifndef __PROCESS_FUTURE_HPP__
#define __PROCESS_FUTURE_HPP__
#include <assert.h>
#include <atomic>
#include <list>
#include <memory> // TODO(benh): Replace shared_ptr with unique_ptr.
#include <ostream>
#include <set>
#include <type_traits>
#include <utility>
#include <vector>
#include <glog/logging.h>
#include <process/clock.hpp>
#include <process/latch.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <process/timer.hpp>
#include <stout/abort.hpp>
#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/error.hpp>
#include <stout/lambda.hpp>
#include <stout/none.hpp>
#include <stout/option.hpp>
#include <stout/preprocessor.hpp>
#include <stout/result.hpp>
#include <stout/result_of.hpp>
#include <stout/stringify.hpp>
#include <stout/synchronized.hpp>
#include <stout/try.hpp>
#include <stout/os/strerror.hpp>
namespace process {
// Forward declarations (instead of include to break circular dependency).
template <typename G>
struct _Deferred;
template <typename T>
class Future;
namespace internal {
template <typename T>
struct wrap;
template <typename T>
struct unwrap;
} // namespace internal {
// Forward declaration of Promise.
template <typename T>
class Promise;
// Forward declaration of WeakFuture.
template <typename T>
class WeakFuture;
// Forward declaration of Failure.
struct Failure;
struct ErrnoFailure;
// Definition of a "shared" future. A future can hold any
// copy-constructible value. A future is considered "shared" because
// by default a future can be accessed concurrently.
template <typename T>
class Future
{
public:
// Constructs a failed future.
static Future<T> failed(const std::string& message);
Future();
/*implicit*/ Future(const T& _t);
/*implicit*/ Future(T&& _t);
template <typename U>
/*implicit*/ Future(const U& u);
/*implicit*/ Future(const Failure& failure);
/*implicit*/ Future(const ErrnoFailure& failure);
/*implicit*/ Future(const Future<T>& that) = default;
/*implicit*/ Future(Future<T>&& that) = default;
template <typename E>
/*implicit*/ Future(const Try<T, E>& t);
template <typename E>
/*implicit*/ Future(const Try<Future<T>, E>& t);
~Future() = default;
// Futures are assignable (and copyable). This results in the
// reference to the previous future data being decremented and a
// reference to 'that' being incremented.
Future<T>& operator=(const Future<T>& that) = default;
Future<T>& operator=(Future<T>&& that) = default;
// Comparison operators useful for using futures in collections.
bool operator==(const Future<T>& that) const;
bool operator!=(const Future<T>& that) const;
bool operator<(const Future<T>& that) const;
// Helpers to get the current state of this future.
bool isPending() const;
bool isReady() const;
bool isDiscarded() const;
bool isFailed() const;
bool isAbandoned() const;
bool hasDiscard() const;
// Discards this future. Returns false if discard has already been
// called or the future has already completed, i.e., is ready,
// failed, or discarded. Note that a discard does not terminate any
// computation but rather acts as a suggestion or indication that
// the caller no longer cares about the result of some
// computation. The callee can decide whether or not to continue the
// computation on their own (where best practices are to attempt to
// stop the computation if possible). The callee can discard the
// computation via Promise::discard which completes a future, at
// which point, Future::isDiscarded is true (and the
// Future::onDiscarded callbacks are executed). Before that point,
// but after calling Future::discard, only Future::hasDiscard will
// return true and the Future::onDiscard callbacks will be invoked.
bool discard();
// Waits for this future to become ready, discarded, or failed.
bool await(const Duration& duration = Seconds(-1)) const;
// Return the value associated with this future, waits indefinitely
// until a value gets associated or until the future is discarded.
const T& get() const;
const T* operator->() const;
// Returns the failure message associated with this future.
const std::string& failure() const;
// Type of the callback functions that can get invoked when the
// future gets set, fails, or is discarded.
typedef lambda::CallableOnce<void()> AbandonedCallback;
typedef lambda::CallableOnce<void()> DiscardCallback;
typedef lambda::CallableOnce<void(const T&)> ReadyCallback;
typedef lambda::CallableOnce<void(const std::string&)> FailedCallback;
typedef lambda::CallableOnce<void()> DiscardedCallback;
typedef lambda::CallableOnce<void(const Future<T>&)> AnyCallback;
// Installs callbacks for the specified events and returns a const
// reference to 'this' in order to easily support chaining.
const Future<T>& onAbandoned(AbandonedCallback&& callback) const;
const Future<T>& onDiscard(DiscardCallback&& callback) const;
const Future<T>& onReady(ReadyCallback&& callback) const;
const Future<T>& onFailed(FailedCallback&& callback) const;
const Future<T>& onDiscarded(DiscardedCallback&& callback) const;
const Future<T>& onAny(AnyCallback&& callback) const;
// TODO(benh): Add onReady, onFailed, onAny for _Deferred<F> where F
// is not expected.
template <typename F>
const Future<T>& onAbandoned(_Deferred<F>&& deferred) const
{
return onAbandoned(
std::move(deferred).operator lambda::CallableOnce<void()>());
}
template <typename F>
const Future<T>& onDiscard(_Deferred<F>&& deferred) const
{
return onDiscard(
std::move(deferred).operator lambda::CallableOnce<void()>());
}
template <typename F>
const Future<T>& onReady(_Deferred<F>&& deferred) const
{
return onReady(
std::move(deferred).operator lambda::CallableOnce<void(const T&)>());
}
template <typename F>
const Future<T>& onFailed(_Deferred<F>&& deferred) const
{
return onFailed(std::move(deferred)
.operator lambda::CallableOnce<void(const std::string&)>());
}
template <typename F>
const Future<T>& onDiscarded(_Deferred<F>&& deferred) const
{
return onDiscarded(
std::move(deferred).operator lambda::CallableOnce<void()>());
}
template <typename F>
const Future<T>& onAny(_Deferred<F>&& deferred) const
{
return onAny(std::move(deferred)
.operator lambda::CallableOnce<void(const Future<T>&)>());
}
private:
// We use the 'Prefer' and 'LessPrefer' structs as a way to prefer
// one function over the other when doing SFINAE for the 'onReady',
// 'onFailed', 'onAny', and 'then' functions. In each of these cases
// we prefer calling the version of the functor that takes in an
// argument (i.e., 'const T&' for 'onReady' and 'then' and 'const
// std::string&' for 'onFailed'), but we allow functors that don't
// care about the argument. We don't need to do this for
// 'onDiscard', 'onDiscarded' or 'onAbandoned' because they don't
// take an argument.
struct LessPrefer {};
struct Prefer : LessPrefer {};
template <typename F, typename = typename result_of<F(const T&)>::type>
const Future<T>& onReady(F&& f, Prefer) const
{
return onReady(lambda::CallableOnce<void(const T&)>(
lambda::partial(
[](typename std::decay<F>::type&& f, const T& t) {
std::move(f)(t);
},
std::forward<F>(f),
lambda::_1)));
}
// This is the less preferred `onReady`, we prefer the `onReady` method which
// has `f` taking a `const T&` parameter. Unfortunately, to complicate
// matters, if `F` is the result of a `std::bind` expression we need to SFINAE
// out this version of `onReady` and force the use of the preferred `onReady`
// (which works because `std::bind` will just ignore the `const T&` argument).
// This is necessary because Visual Studio 2015 doesn't support using the
// `std::bind` call operator with `result_of` as it's technically not a
// requirement by the C++ standard.
template <
typename F,
typename = typename result_of<typename std::enable_if<
!std::is_bind_expression<typename std::decay<F>::type>::value,
F>::type()>::type>
const Future<T>& onReady(F&& f, LessPrefer) const
{
return onReady(lambda::CallableOnce<void(const T&)>(
lambda::partial(
[](typename std::decay<F>::type&& f, const T&) {
std::move(f)();
},
std::forward<F>(f),
lambda::_1)));
}
template <typename F, typename = typename result_of<F(const std::string&)>::type> // NOLINT(whitespace/line_length)
const Future<T>& onFailed(F&& f, Prefer) const
{
return onFailed(lambda::CallableOnce<void(const std::string&)>(
lambda::partial(
[](typename std::decay<F>::type&& f, const std::string& message) {
std::move(f)(message);
},
std::forward<F>(f),
lambda::_1)));
}
// Refer to the less preferred version of `onReady` for why these SFINAE
// conditions are necessary.
template <
typename F,
typename = typename result_of<typename std::enable_if<
!std::is_bind_expression<typename std::decay<F>::type>::value,
F>::type()>::type>
const Future<T>& onFailed(F&& f, LessPrefer) const
{
return onFailed(lambda::CallableOnce<void(const std::string&)>(
lambda::partial(
[](typename std::decay<F>::type&& f, const std::string&) mutable {
std::move(f)();
},
std::forward<F>(f),
lambda::_1)));
}
template <typename F, typename = typename result_of<F(const Future<T>&)>::type> // NOLINT(whitespace/line_length)
const Future<T>& onAny(F&& f, Prefer) const
{
return onAny(lambda::CallableOnce<void(const Future<T>&)>(
lambda::partial(
[](typename std::decay<F>::type&& f, const Future<T>& future) {
std::move(f)(future);
},
std::forward<F>(f),
lambda::_1)));
}
// Refer to the less preferred version of `onReady` for why these SFINAE
// conditions are necessary.
template <
typename F,
typename = typename result_of<typename std::enable_if<
!std::is_bind_expression<typename std::decay<F>::type>::value,
F>::type()>::type>
const Future<T>& onAny(F&& f, LessPrefer) const
{
return onAny(lambda::CallableOnce<void(const Future<T>&)>(
lambda::partial(
[](typename std::decay<F>::type&& f, const Future<T>&) {
std::move(f)();
},
std::forward<F>(f),
lambda::_1)));
}
public:
template <typename F>
const Future<T>& onAbandoned(F&& f) const
{
return onAbandoned(lambda::CallableOnce<void()>(
lambda::partial(
[](typename std::decay<F>::type&& f) {
std::move(f)();
},
std::forward<F>(f))));
}
template <typename F>
const Future<T>& onDiscard(F&& f) const
{
return onDiscard(lambda::CallableOnce<void()>(
lambda::partial(
[](typename std::decay<F>::type&& f) {
std::move(f)();
},
std::forward<F>(f))));
}
template <typename F>
const Future<T>& onReady(F&& f) const
{
return onReady(std::forward<F>(f), Prefer());
}
template <typename F>
const Future<T>& onFailed(F&& f) const
{
return onFailed(std::forward<F>(f), Prefer());
}
template <typename F>
const Future<T>& onDiscarded(F&& f) const
{
return onDiscarded(lambda::CallableOnce<void()>(
lambda::partial(
[](typename std::decay<F>::type&& f) {
std::move(f)();
},
std::forward<F>(f))));
}
template <typename F>
const Future<T>& onAny(F&& f) const
{
return onAny(std::forward<F>(f), Prefer());
}
// Installs callbacks that get executed when this future is ready
// and associates the result of the callback with the future that is
// returned to the caller (which may be of a different type).
template <typename X>
Future<X> then(lambda::CallableOnce<Future<X>(const T&)> f) const;
template <typename X>
Future<X> then(lambda::CallableOnce<X(const T&)> f) const;
template <typename X>
Future<X> then(lambda::CallableOnce<Future<X>()> f) const
{
return then(lambda::CallableOnce<Future<X>(const T&)>(
lambda::partial(std::move(f))));
}
template <typename X>
Future<X> then(lambda::CallableOnce<X()> f) const
{
return then(lambda::CallableOnce<X(const T&)>(
lambda::partial(std::move(f))));
}
private:
template <
typename F,
typename X =
typename internal::unwrap<typename result_of<F(const T&)>::type>::type>
Future<X> then(_Deferred<F>&& f, Prefer) const
{
// note the then<X> is necessary to not have an infinite loop with
// then(F&& f)
return then<X>(
std::move(f).operator lambda::CallableOnce<Future<X>(const T&)>());
}
// Refer to the less preferred version of `onReady` for why these SFINAE
// conditions are necessary.
template <
typename F,
typename X = typename internal::unwrap<
typename result_of<typename std::enable_if<
!std::is_bind_expression<typename std::decay<F>::type>::value,
F>::type()>::type>::type>
Future<X> then(_Deferred<F>&& f, LessPrefer) const
{
return then<X>(std::move(f).operator lambda::CallableOnce<Future<X>()>());
}
template <typename F, typename X = typename internal::unwrap<typename result_of<F(const T&)>::type>::type> // NOLINT(whitespace/line_length)
Future<X> then(F&& f, Prefer) const
{
return then<X>(
lambda::CallableOnce<Future<X>(const T&)>(std::forward<F>(f)));
}
// Refer to the less preferred version of `onReady` for why these SFINAE
// conditions are necessary.
template <
typename F,
typename X = typename internal::unwrap<
typename result_of<typename std::enable_if<
!std::is_bind_expression<typename std::decay<F>::type>::value,
F>::type()>::type>::type>
Future<X> then(F&& f, LessPrefer) const
{
return then<X>(lambda::CallableOnce<Future<X>()>(std::forward<F>(f)));
}
public:
// NOTE: There are two bugs we're dealing with here.
// (1) GCC bug where the explicit use of `this->` is required in the
// trailing return type: gcc.gnu.org/bugzilla/show_bug.cgi?id=57543
// (2) VS 2017 RC bug where the explicit use of `this->` is disallowed.
//
// Since VS 2015 and 2017 RC both implement C++14's deduced return type for
// functions, we simply choose to use that on Windows.
//
// TODO(mpark): Remove the trailing return type once we get to C++14.
template <typename F>
auto then(F&& f) const
#ifndef __WINDOWS__
-> decltype(this->then(std::forward<F>(f), Prefer()))
#endif // __WINDOWS__
{
return then(std::forward<F>(f), Prefer());
}
// Installs callbacks that get executed if this future is abandoned,
// is discarded, or failed.
template <typename F>
Future<T> recover(F&& f) const;
template <typename F>
Future<T> recover(_Deferred<F>&& deferred) const
{
return recover(
std::move(deferred)
.operator lambda::CallableOnce<Future<T>(const Future<T>&)>());
}
// TODO(benh): Considering adding a `rescue` function for rescuing
// abandoned futures.
// Installs callbacks that get executed if this future completes
// because it failed.
Future<T> repair(
lambda::CallableOnce<Future<T>(const Future<T>&)> f) const;
// TODO(benh): Add overloads of 'repair' that don't require passing
// in a function that takes the 'const Future<T>&' parameter and use
// Prefer/LessPrefer to disambiguate.
// Invokes the specified function after some duration if this future
// has not been completed (set, failed, or discarded). Note that
// this function is agnostic of discard semantics and while it will
// propagate discarding "up the chain" it will still invoke the
// specified callback after the specified duration even if 'discard'
// was called on the returned future.
Future<T> after(
const Duration& duration,
lambda::CallableOnce<Future<T>(const Future<T>&)> f) const;
// TODO(benh): Add overloads of 'after' that don't require passing
// in a function that takes the 'const Future<T>&' parameter and use
// Prefer/LessPrefer to disambiguate.
private:
template <typename U>
friend class Future;
friend class Promise<T>;
friend class WeakFuture<T>;
template <typename U>
friend std::ostream& operator<<(std::ostream&, const Future<U>&);
enum State
{
PENDING,
READY,
FAILED,
DISCARDED,
};
struct Data
{
Data();
~Data() = default;
void clearAllCallbacks();
std::atomic_flag lock = ATOMIC_FLAG_INIT;
State state;
bool discard;
bool associated;
bool abandoned;
// One of:
// 1. None, the state is PENDING or DISCARDED.
// 2. Some, the state is READY.
// 3. Error, the state is FAILED; 'error()' stores the message.
Result<T> result;
std::vector<AbandonedCallback> onAbandonedCallbacks;
std::vector<DiscardCallback> onDiscardCallbacks;
std::vector<ReadyCallback> onReadyCallbacks;
std::vector<FailedCallback> onFailedCallbacks;
std::vector<DiscardedCallback> onDiscardedCallbacks;
std::vector<AnyCallback> onAnyCallbacks;
};
// Abandons this future. Returns false if the future is already
// associated or no longer pending. Otherwise returns true and any
// Future::onAbandoned callbacks wil be run.
//
// If `propagating` is true then we'll abandon this future even if
// it has already been associated. This is important because
// `~Promise()` will try and abandon and we need to ignore that if
// the future has been associated since the promise will no longer
// be setting the future anyway (and is likely the reason it's being
// destructed, because it's useless). When the future that we've
// associated with gets abandoned, however, then we need to actually
// abandon this future too. Here's an example of this:
//
// 1: Owned<Promise<int>> promise1(new Promise<int>());
// 2: Owned<Promise<int>> promise2(new Promise<int>());
// 3:
// 4: Future<int> future1 = promise1->future();
// 5: Future<int> future2 = promise2->future();
// 6:
// 7: promise1->associate(future2);
// 8:
// 9: promise1.reset();
// 10:
// 11: assert(!future1.isAbandoned());
// 12:
// 13: promise2.reset();
// 14:
// 15: assert(future2.isAbandoned());
// 16: assert(future3.isAbandoned());
//
// At line 9 `~Promise()` will attempt to abandon the future by
// calling `abandon()` but since it's been associated we won't do
// anything. On line 13 the `onAbandoned()` callback will call
// `abandon(true)` and know we'll actually abandon the future
// because we're _propagating_ the abandon from the associated
// future.
//
// NOTE: this is an _INTERNAL_ function and should never be exposed
// or used outside of the implementation.
bool abandon(bool propagating = false);
// Sets the value for this future, unless the future is already set,
// failed, or discarded, in which case it returns false.
bool set(const T& _t);
bool set(T&& _t);
template <typename U>
bool _set(U&& _u);
// Sets this future as failed, unless the future is already set,
// failed, or discarded, in which case it returns false.
bool fail(const std::string& _message);
std::shared_ptr<Data> data;
};
namespace internal {
// Helper for executing callbacks that have been registered.
//
// TODO(*): Invoke callbacks in another execution context.
template <typename C, typename... Arguments>
void run(std::vector<C>&& callbacks, Arguments&&... arguments)
{
for (size_t i = 0; i < callbacks.size(); ++i) {
std::move(callbacks[i])(std::forward<Arguments>(arguments)...);
}
}
} // namespace internal {
// Represents a weak reference to a future. This class is used to
// break cyclic dependencies between futures.
template <typename T>
class WeakFuture
{
public:
explicit WeakFuture(const Future<T>& future);
// Converts this weak reference to a concrete future. Returns none
// if the conversion is not successful.
Option<Future<T>> get() const;
private:
std::weak_ptr<typename Future<T>::Data> data;
};
template <typename T>
WeakFuture<T>::WeakFuture(const Future<T>& future)
: data(future.data) {}
template <typename T>
Option<Future<T>> WeakFuture<T>::get() const
{
Future<T> future;
future.data = data.lock();
if (future.data) {
return future;
}
return None();
}
// Helper for creating failed futures.
struct Failure
{
explicit Failure(const std::string& _message) : message(_message) {}
explicit Failure(const Error& error) : message(error.message) {}
const std::string message;
};
struct ErrnoFailure : public Failure
{
ErrnoFailure() : ErrnoFailure(errno) {}
explicit ErrnoFailure(int _code)
: Failure(os::strerror(_code)), code(_code) {}
explicit ErrnoFailure(const std::string& message)
: ErrnoFailure(errno, message) {}
ErrnoFailure(int _code, const std::string& message)
: Failure(message + ": " + os::strerror(_code)), code(_code) {}
const int code;
};
// Forward declaration to use as friend below.
namespace internal {
template <typename U>
void discarded(Future<U> future);
} // namespace internal {
// TODO(benh): Make Promise a subclass of Future?
template <typename T>
class Promise
{
public:
Promise();
virtual ~Promise();
explicit Promise(const T& t);
Promise(Promise&& that) = default;
Promise& operator=(Promise&&) = default;
// Not copyable, not assignable.
Promise(const Promise& that) = delete;
Promise& operator=(const Promise&) = delete;
bool discard();
bool set(const T& _t);
bool set(T&& _t);
bool set(const Future<T>& future); // Alias for associate.
bool associate(const Future<T>& future);
bool fail(const std::string& message);
// Returns a copy of the future associated with this promise.
Future<T> future() const;
private:
template <typename U>
friend class Future;
template <typename U>
friend void internal::discarded(Future<U> future);
template <typename U>
bool _set(U&& u);
// Helper for doing the work of actually discarding a future (called
// from Promise::discard as well as internal::discarded).
static bool discard(Future<T> future);
Future<T> f;
};
template <>
class Promise<void>;
template <typename T>
class Promise<T&>;
namespace internal {
// Discards a weak future. If the weak future is invalid (i.e., the
// future it references to has already been destroyed), this operation
// is treated as a no-op.
template <typename T>
void discard(WeakFuture<T> reference)
{
Option<Future<T>> future = reference.get();
if (future.isSome()) {
Future<T> future_ = future.get();
future_.discard();
}
}
// Helper for invoking Promise::discard in an onDiscarded callback
// (since the onDiscarded callback requires returning void we can't
// bind with Promise::discard).
template <typename T>
void discarded(Future<T> future)
{
Promise<T>::discard(future);
}
} // namespace internal {
template <typename T>
Promise<T>::Promise()
{
// Need to "unset" `abandoned` since it gets set in the empty
// constructor for `Future`.
f.data->abandoned = false;
}
template <typename T>
Promise<T>::Promise(const T& t)
: f(t) {}
template <typename T>
Promise<T>::~Promise()
{
// Note that we don't discard the promise as we don't want to give
// the illusion that any computation hasn't started (or possibly
// finished) in the event that computation is "visible" by other
// means. However, we try and abandon the future if it hasn't been
// associated or set (or moved, i.e., `f.data` is true).
if (f.data) {
f.abandon();
}
}
template <typename T>
bool Promise<T>::discard()
{
if (!f.data->associated) {
return discard(f);
}
return false;
}
template <typename T>
bool Promise<T>::set(T&& t)
{
return _set(std::move(t));
}
template <typename T>
bool Promise<T>::set(const T& t)
{
return _set(t);
}
template <typename T>
template <typename U>
bool Promise<T>::_set(U&& u)
{
if (!f.data->associated) {
return f.set(std::forward<U>(u));
}
return false;
}
template <typename T>
bool Promise<T>::set(const Future<T>& future)
{
return associate(future);
}
template <typename T>
bool Promise<T>::associate(const Future<T>& future)
{
bool associated = false;
synchronized (f.data->lock) {
// Don't associate if this promise has completed. Note that this
// does not include if Future::discard was called on this future
// since in that case that would still leave the future PENDING
// (note that we cover that case below).
if (f.data->state == Future<T>::PENDING && !f.data->associated) {
associated = f.data->associated = true;
// After this point we don't allow 'f' to be completed via the
// promise since we've set 'associated' but Future::discard on
// 'f' might get called which will get propagated via the
// 'f.onDiscard' below. Note that we currently don't propagate a
// discard from 'future.onDiscard' but these semantics might
// change if/when we make 'f' and 'future' true aliases of one
// another.
}
}
// Note that we do the actual associating after releasing the lock
// above to avoid deadlocking by attempting to require the lock
// within from invoking 'f.onDiscard' and/or 'f.set/fail' via the
// bind statements from doing 'future.onReady/onFailed'.
if (associated) {
// TODO(jieyu): Make 'f' a true alias of 'future'. Currently, only
// 'discard' is associated in both directions. In other words, if
// a future gets discarded, the other future will also get
// discarded. For 'set' and 'fail', they are associated only in
// one direction. In other words, calling 'set' or 'fail' on this
// promise will not affect the result of the future that we
// associated.
f.onDiscard(lambda::bind(&internal::discard<T>, WeakFuture<T>(future)));
// Need to disambiguate for the compiler.
bool (Future<T>::*set)(const T&) = &Future<T>::set;
future
.onReady(lambda::bind(set, f, lambda::_1))
.onFailed(lambda::bind(&Future<T>::fail, f, lambda::_1))
.onDiscarded(lambda::bind(&internal::discarded<T>, f))
.onAbandoned(lambda::bind(&Future<T>::abandon, f, true));
}
return associated;
}
template <typename T>
bool Promise<T>::fail(const std::string& message)
{
if (!f.data->associated) {
return f.fail(message);
}
return false;
}
template <typename T>
Future<T> Promise<T>::future() const
{
return f;
}
// Internal helper utilities.
namespace internal {
template <typename T>
struct wrap
{
typedef Future<T> type;
};
template <typename X>
struct wrap<Future<X>>
{
typedef Future<X> type;
};
template <typename T>
struct unwrap
{
typedef T type;
};
template <typename X>
struct unwrap<Future<X>>
{
typedef X type;
};
template <typename T>
void select(
const Future<T>& future,
std::shared_ptr<Promise<Future<T>>> promise)
{
// We never fail the future associated with our promise.
assert(!promise->future().isFailed());
if (promise->future().isPending()) { // No-op if it's discarded.
if (future.isReady()) { // We only set the promise if a future is ready.
promise->set(future);
}
}
}
} // namespace internal {
// TODO(benh): Move select and discard into 'futures' namespace.
// Returns a future that captures any ready future in a set. Note that
// select DOES NOT capture a future that has failed or been discarded.
template <typename T>
Future<Future<T>> select(const std::set<Future<T>>& futures)
{
std::shared_ptr<Promise<Future<T>>> promise(new Promise<Future<T>>());
promise->future().onDiscard(
lambda::bind(&internal::discarded<Future<T>>, promise->future()));
foreach (const Future<T>& future, futures) {
future.onAny([=](const Future<T>& f) {
internal::select(f, promise);
});
}
return promise->future();
}
template <typename Futures>
void discard(const Futures& futures)
{
foreach (auto future, futures) { // Need a non-const copy to discard.
future.discard();
}
}
template <typename T>
bool Promise<T>::discard(Future<T> future)
{
bool result = false;
synchronized (future.data->lock) {
if (future.data->state == Future<T>::PENDING) {
future.data->state = Future<T>::DISCARDED;
result = true;
}
}
// Invoke all callbacks associated with this future being
// DISCARDED. We don't need a lock because the state is now in
// DISCARDED so there should not be any concurrent modifications to
// the callbacks.
if (result) {
// NOTE: we rely on the fact that we have `future` to protect
// ourselves from one of the callbacks erroneously deleting the
// future. In `Future::_set()` and `Future::fail()` we have to
// explicitly take a copy to protect ourselves.
internal::run(std::move(future.data->onDiscardedCallbacks));
internal::run(std::move(future.data->onAnyCallbacks), future);
future.data->clearAllCallbacks();
}
return result;
}
template <typename T>
Future<T> Future<T>::failed(const std::string& message)
{
Future<T> future;
future.fail(message);
return future;
}
template <typename T>
Future<T>::Data::Data()
: state(PENDING),
discard(false),
associated(false),
abandoned(false),
result(None()) {}
template <typename T>
void Future<T>::Data::clearAllCallbacks()
{
onAbandonedCallbacks.clear();
onAnyCallbacks.clear();
onDiscardCallbacks.clear();
onDiscardedCallbacks.clear();
onFailedCallbacks.clear();
onReadyCallbacks.clear();
}
template <typename T>
Future<T>::Future()
: data(new Data())
{
data->abandoned = true;
}
template <typename T>
Future<T>::Future(const T& _t)
: data(new Data())
{
set(_t);
}
template <typename T>
Future<T>::Future(T&& _t)
: data(new Data())
{
set(std::move(_t));
}
template <typename T>
template <typename U>
Future<T>::Future(const U& u)
: data(new Data())
{
set(u);
}
template <typename T>
Future<T>::Future(const Failure& failure)
: data(new Data())
{
fail(failure.message);
}
template <typename T>
Future<T>::Future(const ErrnoFailure& failure)
: data(new Data())
{
fail(failure.message);
}
template <typename T>
template <typename E>
Future<T>::Future(const Try<T, E>& t)
: data(new Data())
{
if (t.isSome()){
set(t.get());
} else {
// TODO(chhsiao): Consider preserving the error type. See MESOS-8925.
fail(stringify(t.error()));
}
}
template <typename T>
template <typename E>
Future<T>::Future(const Try<Future<T>, E>& t)
: data(t.isSome() ? t->data : std::shared_ptr<Data>(new Data()))
{
if (!t.isSome()) {
// TODO(chhsiao): Consider preserving the error type. See MESOS-8925.
fail(stringify(t.error()));
}
}
template <typename T>
bool Future<T>::operator==(const Future<T>& that) const
{
return data == that.data;
}
template <typename T>
bool Future<T>::operator!=(const Future<T>& that) const
{
return !(*this == that);
}
template <typename T>
bool Future<T>::operator<(const Future<T>& that) const
{
return data < that.data;
}
template <typename T>
bool Future<T>::discard()
{
bool result = false;
std::vector<DiscardCallback> callbacks;
synchronized (data->lock) {
if (!data->discard && data->state == PENDING) {
result = data->discard = true;
callbacks.swap(data->onDiscardCallbacks);
}
}
// Invoke all callbacks associated with doing a discard on this
// future. The callbacks get destroyed when we exit from the
// function.
if (result) {
internal::run(std::move(callbacks));
}
return result;
}
template <typename T>
bool Future<T>::abandon(bool propagating)
{
bool result = false;
std::vector<AbandonedCallback> callbacks;
synchronized (data->lock) {
if (!data->abandoned &&
data->state == PENDING &&
(!data->associated || propagating)) {
result = data->abandoned = true;
callbacks.swap(data->onAbandonedCallbacks);
}
}
// Invoke all callbacks. The callbacks get destroyed when we exit
// from the function.
if (result) {
internal::run(std::move(callbacks));
}
return result;
}
template <typename T>
bool Future<T>::isPending() const
{
return data->state == PENDING;
}
template <typename T>
bool Future<T>::isReady() const
{
return data->state == READY;
}
template <typename T>
bool Future<T>::isDiscarded() const
{
return data->state == DISCARDED;
}
template <typename T>
bool Future<T>::isFailed() const
{
return data->state == FAILED;
}
template <typename T>
bool Future<T>::isAbandoned() const
{
return data->abandoned;
}
template <typename T>
bool Future<T>::hasDiscard() const
{
return data->discard;
}
namespace internal {
inline void awaited(Owned<Latch> latch)
{
latch->trigger();
}
} // namespace internal {
template <typename T>
bool Future<T>::await(const Duration& duration) const
{
// NOTE: We need to preemptively allocate the Latch on the stack
// instead of lazily create it in the critical section below because
// instantiating a Latch requires creating a new process (at the
// time of writing this comment) which might need to do some
// synchronization in libprocess which might deadlock if some other
// code in libprocess is already holding a lock and then attempts to
// do Promise::set (or something similar) that attempts to acquire
// the lock that we acquire here. This is an artifact of using
// Future/Promise within the implementation of libprocess.
//
// We mostly only call 'await' in tests so this should not be a
// performance concern.
Owned<Latch> latch(new Latch());
bool pending = false;
synchronized (data->lock) {
if (data->state == PENDING) {
pending = true;
data->onAnyCallbacks.push_back(lambda::bind(&internal::awaited, latch));
}
}
if (pending) {
return latch->await(duration);
}
return true;
}
template <typename T>
const T& Future<T>::get() const
{
if (!isReady()) {
await();
}
CHECK(!isPending()) << "Future was in PENDING after await()";
// We can't use CHECK_READY here due to check.hpp depending on future.hpp.
if (!isReady()) {
CHECK(!isFailed()) << "Future::get() but state == FAILED: " << failure();
CHECK(!isDiscarded()) << "Future::get() but state == DISCARDED";
}
assert(data->result.isSome());
return data->result.get();
}
template <typename T>
const T* Future<T>::operator->() const
{
return &get();
}
template <typename T>
const std::string& Future<T>::failure() const
{
if (data->state != FAILED) {
ABORT("Future::failure() but state != FAILED");
}
CHECK_ERROR(data->result);
return data->result.error();
}
template <typename T>
const Future<T>& Future<T>::onAbandoned(AbandonedCallback&& callback) const
{
bool run = false;
synchronized (data->lock) {
if (data->abandoned) {
run = true;
} else if (data->state == PENDING) {
data->onAbandonedCallbacks.emplace_back(std::move(callback));
}
}
// TODO(*): Invoke callback in another execution context.
if (run) {
std::move(callback)(); // NOLINT(misc-use-after-move)
}
return *this;
}
template <typename T>
const Future<T>& Future<T>::onDiscard(DiscardCallback&& callback) const
{
bool run = false;
synchronized (data->lock) {
if (data->discard) {
run = true;
} else if (data->state == PENDING) {
data->onDiscardCallbacks.emplace_back(std::move(callback));
}
}
// TODO(*): Invoke callback in another execution context.
if (run) {
std::move(callback)(); // NOLINT(misc-use-after-move)
}
return *this;
}
template <typename T>
const Future<T>& Future<T>::onReady(ReadyCallback&& callback) const
{
bool run = false;
synchronized (data->lock) {
if (data->state == READY) {
run = true;
} else if (data->state == PENDING) {
data->onReadyCallbacks.emplace_back(std::move(callback));
}
}
// TODO(*): Invoke callback in another execution context.
if (run) {
std::move(callback)(data->result.get()); // NOLINT(misc-use-after-move)
}
return *this;
}
template <typename T>
const Future<T>& Future<T>::onFailed(FailedCallback&& callback) const
{
bool run = false;
synchronized (data->lock) {
if (data->state == FAILED) {
run = true;
} else if (data->state == PENDING) {
data->onFailedCallbacks.emplace_back(std::move(callback));
}
}
// TODO(*): Invoke callback in another execution context.
if (run) {
std::move(callback)(data->result.error()); // NOLINT(misc-use-after-move)
}
return *this;
}
template <typename T>
const Future<T>& Future<T>::onDiscarded(DiscardedCallback&& callback) const
{
bool run = false;
synchronized (data->lock) {
if (data->state == DISCARDED) {
run = true;
} else if (data->state == PENDING) {
data->onDiscardedCallbacks.emplace_back(std::move(callback));
}
}
// TODO(*): Invoke callback in another execution context.
if (run) {
std::move(callback)(); // NOLINT(misc-use-after-move)
}
return *this;
}
template <typename T>
const Future<T>& Future<T>::onAny(AnyCallback&& callback) const
{
bool run = false;
synchronized (data->lock) {
if (data->state == PENDING) {
data->onAnyCallbacks.emplace_back(std::move(callback));
} else {
run = true;
}
}
// TODO(*): Invoke callback in another execution context.
if (run) {
std::move(callback)(*this); // NOLINT(misc-use-after-move)
}
return *this;
}
namespace internal {
// NOTE: We need to name this 'thenf' versus 'then' to distinguish it
// from the function 'then' whose parameter 'f' doesn't return a
// Future since the compiler can't properly infer otherwise.
template <typename T, typename X>
void thenf(lambda::CallableOnce<Future<X>(const T&)>&& f,
std::unique_ptr<Promise<X>> promise,
const Future<T>& future)
{
if (future.isReady()) {
if (future.hasDiscard()) {
promise->discard();
} else {
promise->associate(std::move(f)(future.get()));
}
} else if (future.isFailed()) {
promise->fail(future.failure());
} else if (future.isDiscarded()) {
promise->discard();
}
}
template <typename T, typename X>
void then(lambda::CallableOnce<X(const T&)>&& f,
std::unique_ptr<Promise<X>> promise,
const Future<T>& future)
{
if (future.isReady()) {
if (future.hasDiscard()) {
promise->discard();
} else {
promise->set(std::move(f)(future.get()));
}
} else if (future.isFailed()) {
promise->fail(future.failure());
} else if (future.isDiscarded()) {
promise->discard();
}
}
template <typename T>
void repair(
lambda::CallableOnce<Future<T>(const Future<T>&)>&& f,
std::unique_ptr<Promise<T>> promise,
const Future<T>& future)
{
CHECK(!future.isPending());
if (future.isFailed()) {
promise->associate(std::move(f)(future));
} else {
promise->associate(future);
}
}
template <typename T>
void expired(
const std::shared_ptr<lambda::CallableOnce<Future<T>(const Future<T>&)>>& f,
const std::shared_ptr<Latch>& latch,
const std::shared_ptr<Promise<T>>& promise,
const std::shared_ptr<Option<Timer>>& timer,
const Future<T>& future)
{
if (latch->trigger()) {
// If this callback executed first (i.e., we triggered the latch)
// then we want to clear out the timer so that we don't hold a
// circular reference to `future` in it's own `onAny`
// callbacks. See the comment in `Future::after`.
*timer = None();
// Note that we don't bother checking if 'future' has been
// discarded (i.e., 'future.isDiscarded()' returns true) since
// there is a race between when we make that check and when we
// would invoke 'f(future)' so the callee 'f' should ALWAYS check
// if the future has been discarded and rather than hiding a
// non-deterministic bug we always call 'f' if the timer has
// expired.
promise->associate(std::move(*f)(future));
}
}
template <typename T>
void after(
const std::shared_ptr<Latch>& latch,
const std::shared_ptr<Promise<T>>& promise,
const std::shared_ptr<Option<Timer>>& timer,
const Future<T>& future)
{
CHECK(!future.isPending());
if (latch->trigger()) {
// If this callback executes first (i.e., we triggered the latch)
// it must be the case that `timer` is still some and we can try
// and cancel the timer.
CHECK_SOME(*timer);
Clock::cancel(timer->get());
// We also force the timer to get deallocated so that there isn't
// a cicular reference of the timer with itself which keeps around
// a reference to the original future.
*timer = None();
promise->associate(future);
}
}
} // namespace internal {
template <typename T>
template <typename X>
Future<X> Future<T>::then(lambda::CallableOnce<Future<X>(const T&)> f) const
{
std::unique_ptr<Promise<X>> promise(new Promise<X>());
Future<X> future = promise->future();
lambda::CallableOnce<void(const Future<T>&)> thenf = lambda::partial(
&internal::thenf<T, X>, std::move(f), std::move(promise), lambda::_1);
onAny(std::move(thenf));
onAbandoned([=]() mutable {
future.abandon();
});
// Propagate discarding up the chain. To avoid cyclic dependencies,
// we keep a weak future in the callback.
future.onDiscard(lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
return future;
}
template <typename T>
template <typename X>
Future<X> Future<T>::then(lambda::CallableOnce<X(const T&)> f) const
{
std::unique_ptr<Promise<X>> promise(new Promise<X>());
Future<X> future = promise->future();
lambda::CallableOnce<void(const Future<T>&)> then = lambda::partial(
&internal::then<T, X>, std::move(f), std::move(promise), lambda::_1);
onAny(std::move(then));
onAbandoned([=]() mutable {
future.abandon();
});
// Propagate discarding up the chain. To avoid cyclic dependencies,
// we keep a weak future in the callback.
future.onDiscard(lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
return future;
}
template <typename T>
template <typename F>
Future<T> Future<T>::recover(F&& f) const
{
std::shared_ptr<Promise<T>> promise(new Promise<T>());
const Future<T> future = *this;
typedef decltype(std::move(f)(future)) R;
std::shared_ptr<lambda::CallableOnce<R(const Future<T>&)>> callable(
new lambda::CallableOnce<R(const Future<T>&)>(std::move(f)));
onAny([=]() {
if (future.isDiscarded() || future.isFailed()) {
// We reset `discard` so that if a future gets returned from
// `f(future)` we won't immediately discard it! We still want to
// let the future get discarded later, however, hence if it gets
// set again in the future it'll propagate to the returned
// future.
synchronized (promise->f.data->lock) {
promise->f.data->discard = false;
}
promise->set(std::move(*callable)(future));
} else {
promise->associate(future);
}
});
onAbandoned([=]() {
// See comment above for why we reset `discard` here.
synchronized (promise->f.data->lock) {
promise->f.data->discard = false;
}
promise->set(std::move(*callable)(future));
});
// Propagate discarding up the chain. To avoid cyclic dependencies,
// we keep a weak future in the callback.
promise->future().onDiscard(
lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
return promise->future();
}
template <typename T>
Future<T> Future<T>::repair(
lambda::CallableOnce<Future<T>(const Future<T>&)> f) const
{
std::unique_ptr<Promise<T>> promise(new Promise<T>());
Future<T> future = promise->future();
onAny(lambda::partial(
&internal::repair<T>, std::move(f), std::move(promise), lambda::_1));
onAbandoned([=]() mutable {
future.abandon();
});
// Propagate discarding up the chain. To avoid cyclic dependencies,
// we keep a weak future in the callback.
future.onDiscard(lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
return future;
}
template <typename T>
Future<T> Future<T>::after(
const Duration& duration,
lambda::CallableOnce<Future<T>(const Future<T>&)> f) const
{
// TODO(benh): Using a Latch here but Once might be cleaner.
// Unfortunately, Once depends on Future so we can't easily use it
// from here.
std::shared_ptr<Latch> latch(new Latch());
std::shared_ptr<Promise<T>> promise(new Promise<T>());
// We need to control the lifetime of the timer we create below so
// that we can force the timer to get deallocated after it
// expires. The reason we want to force the timer to get deallocated
// after it expires is because the timer's lambda has a copy of
// `this` (i.e., a Future) and it's stored in the `onAny` callbacks
// of `this` thus creating a circular reference. By storing a
// `shared_ptr<Option<Timer>>` we're able to set the option to none
// after the timer expires which will deallocate our copy of the
// timer and leave the `Option<Timer>` stored in the lambda of the
// `onAny` callback as none. Note that this is safe because the
// `Latch` makes sure that only one of the callbacks will manipulate
// the `shared_ptr<Option<Timer>>` so there isn't any concurrency
// issues we have to worry about.
std::shared_ptr<Option<Timer>> timer(new Option<Timer>());
typedef lambda::CallableOnce<Future<T>(const Future<T>&)> F;
std::shared_ptr<F> callable(new F(std::move(f)));
// Set up a timer to invoke the callback if this future has not
// completed. Note that we do not pass a weak reference for this
// future as we don't want the future to get cleaned up and then
// have the timer expire because then we wouldn't have a valid
// future that we could pass to `f`! The reference to `this` that is
// captured in the timer will get removed by setting the
// `Option<Timer>` to none (see comment above) either if the timer
// expires or if `this` completes and we cancel the timer (see
// `internal::expired` and `internal::after` callbacks for where we
// force the deallocation of our copy of the timer).
*timer = Clock::timer(
duration,
lambda::bind(&internal::expired<T>, callable, latch, promise, timer,
*this));
onAny(lambda::bind(&internal::after<T>, latch, promise, timer, lambda::_1));
onAbandoned([=]() {
promise->future().abandon();
});
// Propagate discarding up the chain. To avoid cyclic dependencies,
// we keep a weak future in the callback.
promise->future().onDiscard(
lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
return promise->future();
}
template <typename T>
bool Future<T>::set(T&& t)
{
return _set(std::move(t));
}
template <typename T>
bool Future<T>::set(const T& t)
{
return _set(t);
}
template <typename T>
template <typename U>
bool Future<T>::_set(U&& u)
{
bool result = false;
synchronized (data->lock) {
if (data->state == PENDING) {
data->result = std::forward<U>(u);
data->state = READY;
result = true;
}
}
// Invoke all callbacks associated with this future being READY. We
// don't need a lock because the state is now in READY so there
// should not be any concurrent modifications to the callbacks.
if (result) {
// Grab a copy of `data` just in case invoking the callbacks
// erroneously attempts to delete this future.
std::shared_ptr<typename Future<T>::Data> copy = data;
internal::run(std::move(copy->onReadyCallbacks), copy->result.get());
internal::run(std::move(copy->onAnyCallbacks), *this);
copy->clearAllCallbacks();
}
return result;
}
template <typename T>
bool Future<T>::fail(const std::string& _message)
{
bool result = false;
synchronized (data->lock) {
if (data->state == PENDING) {
data->result = Result<T>(Error(_message));
data->state = FAILED;
result = true;
}
}
// Invoke all callbacks associated with this future being FAILED. We
// don't need a lock because the state is now in FAILED so there
// should not be any concurrent modifications to the callbacks.
if (result) {
// Grab a copy of `data` just in case invoking the callbacks
// erroneously attempts to delete this future.
std::shared_ptr<typename Future<T>::Data> copy = data;
internal::run(std::move(copy->onFailedCallbacks), copy->result.error());
internal::run(std::move(copy->onAnyCallbacks), *this);
copy->clearAllCallbacks();
}
return result;
}
template <typename T>
std::ostream& operator<<(std::ostream& stream, const Future<T>& future)
{
const std::string suffix = future.data->discard ? " (with discard)" : "";
switch (future.data->state) {
case Future<T>::PENDING:
if (future.data->abandoned) {
return stream << "Abandoned" << suffix;
}
return stream << "Pending" << suffix;
case Future<T>::READY:
// TODO(benh): Stringify `Future<T>::get()` if it can be
// stringified (will need to be SFINAE'ed appropriately).
return stream << "Ready" << suffix;
case Future<T>::FAILED:
return stream << "Failed" << suffix << ": " << future.failure();
case Future<T>::DISCARDED:
return stream << "Discarded" << suffix;
}
return stream;
}
// Helper for setting a set of Promises.
template <typename T>
void setPromises(std::set<Promise<T>*>* promises, const T& t)
{
foreach (Promise<T>* promise, *promises) {
promise->set(t);
delete promise;
}
promises->clear();
}
// Helper for failing a set of Promises.
template <typename T>
void failPromises(std::set<Promise<T>*>* promises, const std::string& failure)
{
foreach (Promise<T>* promise, *promises) {
promise->fail(failure);
delete promise;
}
promises->clear();
}
// Helper for discarding a set of Promises.
template <typename T>
void discardPromises(std::set<Promise<T>*>* promises)
{
foreach (Promise<T>* promise, *promises) {
promise->discard();
delete promise;
}
promises->clear();
}
// Helper for discarding an individual promise in the set.
template <typename T>
void discardPromises(std::set<Promise<T>*>* promises, const Future<T>& future)
{
foreach (Promise<T>* promise, *promises) {
if (promise->future() == future) {
promise->discard();
promises->erase(promise);
delete promise;
return;
}
}
}
// Returns a future that will not propagate a discard through to the
// future passed in as an argument. This can be very valuable if you
// want to block some future from getting discarded.
//
// Example:
//
// Promise<int> promise;
// Future<int> future = undiscardable(promise.future());
// future.discard();
// assert(!promise.future().hasDiscard());
//
// Or another example, when chaining futures:
//
// Future<int> future = undiscardable(
// foo()
// .then([]() { ...; })
// .then([]() { ...; }));
//
// This will guarantee that a discard _will not_ propagate to `foo()`
// or any of the futures returned from the invocations of `.then()`.
template <typename T>
Future<T> undiscardable(const Future<T>& future)
{
std::unique_ptr<Promise<T>> promise(new Promise<T>());
Future<T> future_ = promise->future();
future.onAny(lambda::partial(
[](std::unique_ptr<Promise<T>> promise, const Future<T>& future) {
promise->associate(future);
},
std::move(promise),
lambda::_1));
return future_;
}
// Decorator that for some callable `f` invokes
// `undiscardable(f(args))` for some `args`. This is used by the
// overload of `undiscardable()` that takes callables instead of a
// specialization of `Future`.
//
// TODO(benh): Factor out a generic decorator pattern to be used in
// other circumstances, e.g., to replace `_Deferred`.
template <typename F>
struct UndiscardableDecorator
{
template <
typename G,
typename std::enable_if<
std::is_constructible<F, G>::value, int>::type = 0>
UndiscardableDecorator(G&& g) : f(std::forward<G>(g)) {}
template <typename... Args>
auto operator()(Args&&... args)
-> decltype(std::declval<F&>()(std::forward<Args>(args)...))
{
using Result =
typename std::decay<decltype(f(std::forward<Args>(args)...))>::type;
static_assert(
is_specialization_of<Result, Future>::value,
"Expecting Future<T> to be returned from undiscarded(...)");
return undiscardable(f(std::forward<Args>(args)...));
}
F f;
};
// An overload of `undiscardable()` above that takes and returns a
// callable. The returned callable has decorated the provided callable
// `f` such that when the returned callable is invoked it will in turn
// invoke `undiscardable(f(args))` for some `args`. See
// `UndiscardableDecorator` above for more details.
//
// Example:
//
// Future<int> future = foo()
// .then(undiscardable([]() { ...; }));
//
// This guarantees that even if `future` is discarded the discard will
// not propagate into the lambda passed into `.then()`.
template <
typename F,
typename std::enable_if<
!is_specialization_of<typename std::decay<F>::type, Future>::value,
int>::type = 0>
UndiscardableDecorator<typename std::decay<F>::type> undiscardable(F&& f)
{
return UndiscardableDecorator<
typename std::decay<F>::type>(std::forward<F>(f));
}
} // namespace process {
#endif // __PROCESS_FUTURE_HPP__