A process
is an actor, effectively a cross between a thread and an object.
Creating/spawning a process is very cheap (no actual thread gets created, and no thread stack gets allocated).
Each process has a queue of incoming events that it processes one at a time.
Processes provide execution contexts (only one thread executing within a process at a time so no need for per process synchronization).
delay
delay
instead of dispatching for execution right away, it allows it to be scheduled after a certain time duration.
dispatch
dispatch
schedules a method for asynchronous execution.
defer
Objects like Future
allow attaching callbacks that get executed synchronously on certain events, such as the completion of a future (e.g., Future::then
and Future::onReady
) or the failure of a future (e.g., Future::onFailed
). It's usually desireable, however, to execute these callbacks asynchronously, and defer
provides a mechanism to do so.
defer
is similar to dispatch
, but rather than enqueing the execution of a method or function on the specified process immediately (i.e., synchronously), it returns a Deferred
, which is a callable object that only after getting invoked will dispatch the method or function on the specified process. Said another way, using defer
is a way to defer a dispatch
.
As an example, consider the following function, which spawns a process and registers two callbacks on it, one using defer
and another without it:
using namespace process; void foo() { ProcessBase process; spawn(process); Deferred<void(int)> deferred = defer( process, [](int i) { // Invoked _asynchronously_ using `process` as the // execution context. }); Promise<int> promise; promise.future().then(deferred); promise.future().then([](int i) { // Invoked synchronously from the execution context of // the thread that completes the future! }); // Executes both callbacks synchronously, which _dispatches_ // the deferred lambda to run asynchronously in the execution // context of `process` but invokes the other lambda immediately. promise.set(42); terminate(process); }
As another example, consider this excerpt from the Mesos project's src/master/master.cpp
:
// Start contending to be a leading master and detecting the current leader. // NOTE: `.onAny` passes the relevant future to its callback as a parameter, and // `lambda::_1` facilitates this when using `defer`. contender->contend() .onAny(defer(self(), &Master::contended, lambda::_1));
Why use defer
in this context rather than just executing Master::detected
synchronously? To answer this, we need to remember that when the promise associated with the future returned from contender->contend()
is completed that will synchronously invoke all registered callbacks (i.e., the Future::onAny
one in the example above), which may be in a different process! Without using defer
the process responsible for executing contender->contend()
will potentially cause &Master::contended
to get executed simultaneously (i.e., on a different thread) than the Master
process! This creates the potential for a data race in which two threads access members of Master
concurrently. Instead, using defer
(with self()
) will dispatch the method back to the Master
process to be executed at a later point in time within the single-threaded execution context of the Master
. Using defer
here precisely allows us to capture these semantics.
A natural question that folks often ask is whether or not we ever don't want to use defer(self(), ...)
, or even just ‘defer`. In some circumstances, you actually don’t need to defer back to your own process, but you often want to defer. A good example of that is handling HTTP requests. Consider this example:
using namespace process; using std::string; class HttpProcess : public Process<HttpProcess> { public: virtual void initialize() { route("/route", None(), [](const http::Request& request) { return functionWhichReturnsAFutureOfString() .then(defer(self(), [](const string& s) { return http::OK("String returned in body: " + s); })); }); } };
Now, while this is totally legal and correct code, the callback executed after functionWhichReturnsAFutureOfString
is completed does not need to be executed within the execution context of HttpProcess
because it doesn't require any state from HttpProcess
! In this case, rather than forcing the execution of the callback within the execution context of HttpProcess
, which will block other callbacks that must be executed by HttpProcess
, we can simply just run this lambda using an execution context that libprocess can pick for us (from a pool of threads). We do so by removing self()
as the first argument to defer
:
using namespace process; using std::string; class HttpProcess : public Process<HttpProcess> { public: virtual void initialize() { route("/route", None(), [](const http::Request& request) { return functionWhichReturnsAFutureOfString() .then(defer([](const string& s) { return http::OK("String returned in body: " + s); })); }); } };
Note that even in this example we still want to use defer
! Why? Because otherwise we are blocking the execution context (i.e., process, thread, etc) that is completing the future because it is synchronously executing the callbacks! Instead, we want to let the callback get executed asynchronously by using defer.
Let‘s construct a simple example that illustrates a problem that can be introduced by omitting defer
from callback registrations. We’ll define a method for our HttpProcess
class which accepts an input string and returns a future via an asyncronous method called asyncStoreData
:
using namespace process; using std::string; class HttpProcess : public Process<HttpProcess> { public: // Returns the number of bytes stored. Future<int> inputHandler(const string input); protected: // Returns the number of bytes stored. Future<int> asyncStoreData(const string input); int storedCount; } Future<int> HttpProcess::inputHandler(const string input) { LOG(INFO) << "HttpProcess received input: " << input; return asyncStoreData(input) .then([this](int bytes) -> Future<int> { this->storedCount += bytes; LOG(INFO) << "Successfully stored input. " << "Total bytes stored so far: " << this->storedCount; return bytes; }); }
When the callback is registered on the Future<int>
returned by asyncStoreData
, a lambda is passed directly to then
. This means that the lambda will be executed in whatever execution context eventually fulfills the future. If the future is fulfilled in a different execution context (i.e., inside a different libprocess process), then it's possible that the instance of HttpProcess
that originally invoked inputHandler
will have been destroyed, making this
a dangling pointer. In order to avoid this possibility, the callback should be registered as follows:
return asyncStoreData(input) .then(defer(self(), [this](int bytes) -> Future<int> { ... }));
The lambda is then guaranteed to execute within the execution context of the current process, and we know that this
will still be a valid pointer. We should write libprocess code that makes no assumptions about the execution context in which a given future is fulfilled. Even if we can verify that a future will be fulfilled in the current process, registering a callback without defer
makes the code more fragile by allowing the possibility that another contributor will make changes without considering the impact of those changes on the registered callbacks' execution contexts. Thus, defer
should always be used. We offer the following rule to determine which form of defer
should be used in a given situation:
defer(pid, callback)
, where pid
is the PID of the process whose state is being accessed.defer(callback)
.ID
Generates a unique identifier string given a prefix. This is used to provide PID
names.
PID
A PID
provides a level of indirection for naming a process without having an actual reference (pointer) to it (necessary for remote processes).
The Future
and Promise
primitives are used to enable programmers to write asynchronous, non-blocking, and highly concurrent software.
A Future
acts as the read-side of a result which might be computed asynchronously. A Promise
, on the other hand, acts as the write-side “container”. We'll use some examples to explain the concepts.
First, you can construct a Promise
of a particular type by doing the following:
using namespace process; int main(int argc, char** argv) { Promise<int> promise; return 0; }
A Promise
is not copyable or assignable, in order to encourage strict ownership rules between processes (i.e., it‘s hard to reason about multiple actors concurrently trying to complete a Promise
, even if it’s safe to do so concurrently).
You can get a Future
from a Promise
using the Promise::future()
method:
using namespace process; int main(int argc, char** argv) { Promise<int> promise; Future<int> future = promise.future(); return 0; }
Note that the templated type of the future must be the exact same as the promise: you cannot create a covariant or contravariant future. Unlike Promise
, a Future
can be both copied and assigned:
using namespace process; int main(int argc, char** argv) { Promise<int> promise; Future<int> future = promise.future(); // You can copy a future. Future<int> future2 = future; // You can also assign a future (NOTE: this future will never // complete because the Promise goes out of scope, but the // Future is still valid and can be used normally.) future = Promise<int>().future(); return 0; }
The result encapsulated in the Future
/Promise
can be in one of four states: PENDING
, READY
, FAILED
, DISCARDED
. When a Promise
is first created the result is PENDING
. When you complete a Promise
using the Promise::set()
method the result becomes READY
:
using namespace process; int main(int argc, char** argv) { Promise<int> promise; Future<int> future = promise.future(); promise.set(42); CHECK(future.isReady()); return 0; }
NOTE:
CHECK
is a macro fromgtest
which acts like anassert
but prints a stack trace and does better signal management. In addition toCHECK
, we‘ve also created wrapper macrosCHECK_PENDING
,CHECK_READY
,CHECK_FAILED
,CHECK_DISCARDED
which enables you to more concisely do things likeCHECK_READY(future)
in your code. We’ll use those throughout the rest of this guide.
TODO(benh):
Future
and Promise
between actors, i.e., dispatch
returning a Future
Promise::fail()
Promise::discard()
and Future::discard()
Future::onReady()
, Future::onFailed()
, Future::onDiscarded()
Future::then()
, Future::repair()
, Future::after
defer
Future::await()
libprocess provides facilities for communicating between actors via HTTP messages. With the advent of the HTTP API, HTTP is becoming the preferred mode of communication.
route
route
installs an HTTP endpoint onto a process. Let's define a simple process that installs an endpoint upon initialization:
using namespace process; using namespace process::http; class HttpProcess : public Process<HttpProcess> { protected: virtual void initialize() { route("/testing", None(), [](const Request& request) { return testing(request.query); }); } }; class Http { public: Http() : process(new HttpProcess()) { spawn(process.get()); } virtual ~Http() { terminate(process.get()); wait(process.get()); } Owned<HttpProcess> process; };
Now if our program instantiates this class, we can do something like: $ curl localhost:1234/testing?value=42
Note that the port at which this endpoint can be reached is the port libprocess has bound to, which is determined by the LIBPROCESS_PORT
environment variable. In the case of the Mesos master or agent, this environment variable is set according to the --port
command-line flag.
get
get
will hit an HTTP endpoint with a GET request and return a Future
containing the response. We can pass it either a libprocess UPID
or a URL
. Here's an example hitting the endpoint assuming we have a UPID
named upid
:
Future<Response> future = get(upid, "testing");
Or let‘s assume our serving process has been set up on a remote server and we want to hit its endpoint. We’ll construct a URL
for the address and then call get
:
URL url = URL("http", "hostname", 1234, "/testing"); Future<Response> future = get(url);
post
and requestDelete
The post
and requestDelete
functions will similarly send POST and DELETE requests to an HTTP endpoint. Their invocation is analogous to get
.
Connection
A Connection
represents a connection to an HTTP server. connect
can be used to connect to a server, and returns a Future
containing the Connection
. Let's open a connection to a server and send some requests:
Future<Connection> connect = connect(url); connect.await(); Connection connection = connect.get(); Request request; request.method = "GET"; request.url = url; request.body = "Amazing prose goes here."; request.keepAlive = true; Future<Response> response = connection.send(request);
It's also worth noting that if multiple requests are sent in succession on a Connection
, they will be automatically pipelined.
Asynchronous programs often use timeouts, e.g., because a process that initiates an asynchronous operation wants to take action if the operation hasn't completed within a certain time bound. To facilitate this, libprocess provides a set of abstractions that simplify writing timeout logic. Importantly, test code has the ability to manipulate the clock, in order to ensure that timeout logic is exercised (without needing to block the test program until the appropriate amount of system time has elapsed).
To invoke a function after a certain amount of time has elapsed, use delay
:
using namespace process; class DelayedProcess : public Process<DelayedProcess> { public: void action(const string& name) { LOG(INFO) << "hello, " << name; promise.set(Nothing()); } Promise<Nothing> promise; }; int main() { DelayedProcess process; spawn(process); LOG(INFO) << "Starting to wait"; delay(Seconds(5), process.self(), &DelayedProcess::action, "Neil"); AWAIT_READY(process.promise.future()); LOG(INFO) << "Done waiting"; terminate(process); wait(process); return 0; }
This invokes the action
function after (at least) five seconds of time have elapsed. When writing unit tests for this code, blocking the test for five seconds is undesirable. To avoid this, we can use Clock::advance
:
int main() { DelayedProcess process; spawn(process); LOG(INFO) << "Starting to wait"; Clock::pause(); delay(Seconds(5), process.self(), &DelayedProcess::action, "Neil"); Clock::advance(Seconds(5)); AWAIT_READY(process.promise.future()); LOG(INFO) << "Done waiting"; terminate(process); wait(process); Clock::resume(); return 0; }
async
Async defines a function template for asynchronously executing function closures. It provides their results as futures.
There are a handful of compile-time optimizations that can be configured to improve the run queue and event queue performance. These are currently not enabled by default as they are considered alpha. These optimizations include:
--enable-lock-free-run-queue
(autotools) or -DENABLE_LOCK_FREE_RUN_QUEUE
(cmake) which enables the lock-free run queue implementation.
--enable-lock-free-event-queue
(autotools) or -DENABLE_LOCK_FREE_EVENT_QUEUE
(cmake) which enables the lock-free event queue implementation.
--enable-last-in-first-out-fixed-size-semaphore
(autotools) or -DENABLE_LAST_IN_FIRST_OUT_FIXED_SIZE_SEMAPHORE
(cmake) which enables an optimized semaphore implementation.
Both the lock-free run queue implementation and the lock-free event queue implementation use moodycamel::ConcurrentQueue
which can be found here.
For the run queue we use a semaphore to block threads when there are not any processes to run. On Linux we found that using a semaphore from glibc (i.e., sem_create
, sem_wait
, sem_post
, etc) had some performance issues. We discuss those performance issues and how our optimized semaphore overcomes them in more detail in semaphore.hpp.
The benchmark that we've used to drive the run queue and event queue performance improvements can be found in benchmarks.cpp. You can run the benchmark yourself by invoking ./benchmarks --gtest_filter=ProcessTest.*ThroughputPerformance
.