blob: 82432a72ae291d472aca6de879803a0416829179 [file]
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#ifndef __PROCESS_DISPATCH_HPP__
#define __PROCESS_DISPATCH_HPP__
#include <functional>
#include <memory>
#include <string>
#include <process/process.hpp>
#include <stout/lambda.hpp>
#include <stout/preprocessor.hpp>
#include <stout/result_of.hpp>
namespace process {
// The dispatch mechanism enables you to "schedule" a method to get
// invoked on a process. The result of that method invocation is
// accessible via the future that is returned by the dispatch method
// (note, however, that it might not be the _same_ future as the one
// returned from the method, if the method even returns a future, see
// below). Assuming some class 'Fibonacci' has a (visible) method
// named 'compute' that takes an integer, N (and returns the Nth
// fibonacci number) you might use dispatch like so:
//
// PID<Fibonacci> pid = spawn(new Fibonacci(), true); // Use the GC.
// Future<int> f = dispatch(pid, &Fibonacci::compute, 10);
//
// Because the pid argument is "typed" we can ensure that methods are
// only invoked on processes that are actually of that type. Providing
// this mechanism for varying numbers of function types and arguments
// requires support for variadic templates, slated to be released in
// C++11. Until then, we use the Boost preprocessor macros to
// accomplish the same thing (albeit less cleanly). See below for
// those definitions.
//
// Dispatching is done via a level of indirection. The dispatch
// routine itself creates a promise that is passed as an argument to a
// partially applied 'dispatcher' function (defined below). The
// dispatcher routines get passed to the actual process via an
// internal routine called, not surprisingly, 'dispatch', defined
// below:
namespace internal {
// The internal dispatch routine schedules a function to get invoked
// within the context of the process associated with the specified pid
// (first argument), unless that process is no longer valid. Note that
// this routine does not expect anything in particular about the
// specified function (second argument). The semantics are simple: the
// function gets applied/invoked with the process as its first
// argument.
void dispatch(
const UPID& pid,
std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f,
const Option<const std::type_info*>& functionType = None());
// NOTE: This struct is used by the public `dispatch(const UPID& pid, F&& f)`
// function. See comments there for the reason why we need this.
template <typename R>
struct Dispatch;
// Partial specialization for callable objects returning `void` to be dispatched
// on a process.
// NOTE: This struct is used by the public `dispatch(const UPID& pid, F&& f)`
// function. See comments there for the reason why we need this.
template <>
struct Dispatch<void>
{
template <typename F>
void operator()(const UPID& pid, F&& f)
{
std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f_(
new lambda::CallableOnce<void(ProcessBase*)>(
lambda::partial(
[](typename std::decay<F>::type&& f, ProcessBase*) {
std::move(f)();
},
std::forward<F>(f),
lambda::_1)));
internal::dispatch(pid, std::move(f_));
}
};
// Partial specialization for callable objects returning `Future<R>` to be
// dispatched on a process.
// NOTE: This struct is used by the public `dispatch(const UPID& pid, F&& f)`
// function. See comments there for the reason why we need this.
template <typename R>
struct Dispatch<Future<R>>
{
template <typename F>
Future<R> operator()(const UPID& pid, F&& f)
{
std::unique_ptr<Promise<R>> promise(new Promise<R>());
Future<R> future = promise->future();
std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f_(
new lambda::CallableOnce<void(ProcessBase*)>(
lambda::partial(
[](std::unique_ptr<Promise<R>> promise,
typename std::decay<F>::type&& f,
ProcessBase*) {
promise->associate(std::move(f)());
},
std::move(promise),
std::forward<F>(f),
lambda::_1)));
internal::dispatch(pid, std::move(f_));
return future;
}
};
// Dispatches a callable object returning `R` on a process.
// NOTE: This struct is used by the public `dispatch(const UPID& pid, F&& f)`
// function. See comments there for the reason why we need this.
template <typename R>
struct Dispatch
{
template <typename F>
Future<R> operator()(const UPID& pid, F&& f)
{
std::unique_ptr<Promise<R>> promise(new Promise<R>());
Future<R> future = promise->future();
std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f_(
new lambda::CallableOnce<void(ProcessBase*)>(
lambda::partial(
[](std::unique_ptr<Promise<R>> promise,
typename std::decay<F>::type&& f,
ProcessBase*) {
promise->set(std::move(f)());
},
std::move(promise),
std::forward<F>(f),
lambda::_1)));
internal::dispatch(pid, std::move(f_));
return future;
}
};
} // namespace internal {
// Okay, now for the definition of the dispatch routines
// themselves. For each routine we provide the version in C++11 using
// variadic templates so the reader can see what the Boost
// preprocessor macros are effectively providing. Using C++11 closures
// would shorten these definitions even more.
//
// First, definitions of dispatch for methods returning void:
template <typename T>
void dispatch(const PID<T>& pid, void (T::*method)())
{
std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f(
new lambda::CallableOnce<void(ProcessBase*)>(
[=](ProcessBase* process) {
assert(process != nullptr);
T* t = dynamic_cast<T*>(process);
assert(t != nullptr);
(t->*method)();
}));
internal::dispatch(pid, std::move(f), &typeid(method));
}
template <typename T>
void dispatch(const Process<T>& process, void (T::*method)())
{
dispatch(process.self(), method);
}
template <typename T>
void dispatch(const Process<T>* process, void (T::*method)())
{
dispatch(process->self(), method);
}
// Due to a bug (http://gcc.gnu.org/bugzilla/show_bug.cgi?id=41933)
// with variadic templates and lambdas, we still need to do
// preprocessor expansions.
// The following assumes base names for type and variable are `A` and `a`.
#define FORWARD(Z, N, DATA) std::forward<A ## N>(a ## N)
#define MOVE(Z, N, DATA) std::move(a ## N)
#define DECL(Z, N, DATA) typename std::decay<A ## N>::type&& a ## N
#define TEMPLATE(Z, N, DATA) \
template <typename T, \
ENUM_PARAMS(N, typename P), \
ENUM_PARAMS(N, typename A)> \
void dispatch( \
const PID<T>& pid, \
void (T::*method)(ENUM_PARAMS(N, P)), \
ENUM_BINARY_PARAMS(N, A, &&a)) \
{ \
std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( \
new lambda::CallableOnce<void(ProcessBase*)>( \
lambda::partial( \
[method](ENUM(N, DECL, _), ProcessBase* process) { \
assert(process != nullptr); \
T* t = dynamic_cast<T*>(process); \
assert(t != nullptr); \
(t->*method)(ENUM(N, MOVE, _)); \
}, \
ENUM(N, FORWARD, _), \
lambda::_1))); \
\
internal::dispatch(pid, std::move(f), &typeid(method)); \
} \
\
template <typename T, \
ENUM_PARAMS(N, typename P), \
ENUM_PARAMS(N, typename A)> \
void dispatch( \
const Process<T>& process, \
void (T::*method)(ENUM_PARAMS(N, P)), \
ENUM_BINARY_PARAMS(N, A, &&a)) \
{ \
dispatch(process.self(), method, ENUM(N, FORWARD, _)); \
} \
\
template <typename T, \
ENUM_PARAMS(N, typename P), \
ENUM_PARAMS(N, typename A)> \
void dispatch( \
const Process<T>* process, \
void (T::*method)(ENUM_PARAMS(N, P)), \
ENUM_BINARY_PARAMS(N, A, &&a)) \
{ \
dispatch(process->self(), method, ENUM(N, FORWARD, _)); \
}
REPEAT_FROM_TO(1, 13, TEMPLATE, _) // Args A0 -> A11.
#undef TEMPLATE
// Next, definitions of methods returning a future:
template <typename R, typename T>
Future<R> dispatch(const PID<T>& pid, Future<R> (T::*method)())
{
std::unique_ptr<Promise<R>> promise(new Promise<R>());
Future<R> future = promise->future();
std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f(
new lambda::CallableOnce<void(ProcessBase*)>(
lambda::partial(
[=](std::unique_ptr<Promise<R>> promise, ProcessBase* process) {
assert(process != nullptr);
T* t = dynamic_cast<T*>(process);
assert(t != nullptr);
promise->associate((t->*method)());
},
std::move(promise),
lambda::_1)));
internal::dispatch(pid, std::move(f), &typeid(method));
return future;
}
template <typename R, typename T>
Future<R> dispatch(const Process<T>& process, Future<R> (T::*method)())
{
return dispatch(process.self(), method);
}
template <typename R, typename T>
Future<R> dispatch(const Process<T>* process, Future<R> (T::*method)())
{
return dispatch(process->self(), method);
}
#define TEMPLATE(Z, N, DATA) \
template <typename R, \
typename T, \
ENUM_PARAMS(N, typename P), \
ENUM_PARAMS(N, typename A)> \
Future<R> dispatch( \
const PID<T>& pid, \
Future<R> (T::*method)(ENUM_PARAMS(N, P)), \
ENUM_BINARY_PARAMS(N, A, &&a)) \
{ \
std::unique_ptr<Promise<R>> promise(new Promise<R>()); \
Future<R> future = promise->future(); \
\
std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( \
new lambda::CallableOnce<void(ProcessBase*)>( \
lambda::partial( \
[method](std::unique_ptr<Promise<R>> promise, \
ENUM(N, DECL, _), \
ProcessBase* process) { \
assert(process != nullptr); \
T* t = dynamic_cast<T*>(process); \
assert(t != nullptr); \
promise->associate( \
(t->*method)(ENUM(N, MOVE, _))); \
}, \
std::move(promise), \
ENUM(N, FORWARD, _), \
lambda::_1))); \
\
internal::dispatch(pid, std::move(f), &typeid(method)); \
\
return future; \
} \
\
template <typename R, \
typename T, \
ENUM_PARAMS(N, typename P), \
ENUM_PARAMS(N, typename A)> \
Future<R> dispatch( \
const Process<T>& process, \
Future<R> (T::*method)(ENUM_PARAMS(N, P)), \
ENUM_BINARY_PARAMS(N, A, &&a)) \
{ \
return dispatch(process.self(), method, ENUM(N, FORWARD, _)); \
} \
\
template <typename R, \
typename T, \
ENUM_PARAMS(N, typename P), \
ENUM_PARAMS(N, typename A)> \
Future<R> dispatch( \
const Process<T>* process, \
Future<R> (T::*method)(ENUM_PARAMS(N, P)), \
ENUM_BINARY_PARAMS(N, A, &&a)) \
{ \
return dispatch(process->self(), method, ENUM(N, FORWARD, _)); \
}
REPEAT_FROM_TO(1, 13, TEMPLATE, _) // Args A0 -> A11.
#undef TEMPLATE
// Next, definitions of methods returning a value.
template <typename R, typename T>
Future<R> dispatch(const PID<T>& pid, R (T::*method)())
{
std::unique_ptr<Promise<R>> promise(new Promise<R>());
Future<R> future = promise->future();
std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f(
new lambda::CallableOnce<void(ProcessBase*)>(
lambda::partial(
[=](std::unique_ptr<Promise<R>> promise, ProcessBase* process) {
assert(process != nullptr);
T* t = dynamic_cast<T*>(process);
assert(t != nullptr);
promise->set((t->*method)());
},
std::move(promise),
lambda::_1)));
internal::dispatch(pid, std::move(f), &typeid(method));
return future;
}
template <typename R, typename T>
Future<R> dispatch(const Process<T>& process, R (T::*method)())
{
return dispatch(process.self(), method);
}
template <typename R, typename T>
Future<R> dispatch(const Process<T>* process, R (T::*method)())
{
return dispatch(process->self(), method);
}
#define TEMPLATE(Z, N, DATA) \
template <typename R, \
typename T, \
ENUM_PARAMS(N, typename P), \
ENUM_PARAMS(N, typename A)> \
Future<R> dispatch( \
const PID<T>& pid, \
R (T::*method)(ENUM_PARAMS(N, P)), \
ENUM_BINARY_PARAMS(N, A, &&a)) \
{ \
std::unique_ptr<Promise<R>> promise(new Promise<R>()); \
Future<R> future = promise->future(); \
\
std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( \
new lambda::CallableOnce<void(ProcessBase*)>( \
lambda::partial( \
[method](std::unique_ptr<Promise<R>> promise, \
ENUM(N, DECL, _), \
ProcessBase* process) { \
assert(process != nullptr); \
T* t = dynamic_cast<T*>(process); \
assert(t != nullptr); \
promise->set((t->*method)(ENUM(N, MOVE, _))); \
}, \
std::move(promise), \
ENUM(N, FORWARD, _), \
lambda::_1))); \
\
internal::dispatch(pid, std::move(f), &typeid(method)); \
\
return future; \
} \
\
template <typename R, \
typename T, \
ENUM_PARAMS(N, typename P), \
ENUM_PARAMS(N, typename A)> \
Future<R> dispatch( \
const Process<T>& process, \
R (T::*method)(ENUM_PARAMS(N, P)), \
ENUM_BINARY_PARAMS(N, A, &&a)) \
{ \
return dispatch(process.self(), method, ENUM(N, FORWARD, _)); \
} \
\
template <typename R, \
typename T, \
ENUM_PARAMS(N, typename P), \
ENUM_PARAMS(N, typename A)> \
Future<R> dispatch( \
const Process<T>* process, \
R (T::*method)(ENUM_PARAMS(N, P)), \
ENUM_BINARY_PARAMS(N, A, &&a)) \
{ \
return dispatch(process->self(), method, ENUM(N, FORWARD, _)); \
}
REPEAT_FROM_TO(1, 13, TEMPLATE, _) // Args A0 -> A11.
#undef TEMPLATE
#undef DECL
#undef MOVE
#undef FORWARD
// We use partial specialization of
// - internal::Dispatch<void> vs
// - internal::Dispatch<Future<R>> vs
// - internal::Dispatch
// in order to determine whether R is void, Future or other types.
template <typename F, typename R = typename result_of<F()>::type>
auto dispatch(const UPID& pid, F&& f)
-> decltype(internal::Dispatch<R>()(pid, std::forward<F>(f)))
{
return internal::Dispatch<R>()(pid, std::forward<F>(f));
}
} // namespace process {
#endif // __PROCESS_DISPATCH_HPP__