blob: 97d190d0603b4fd719dcf388ab97dee516c7d4c5 [file] [log] [blame] [view]
# Libprocess Developer Guide
*Note:* this Developer Guide is Work in Progress.
The library _libprocess_ provides high level elements for an
actor programming style with asynchronous message-handling and a
variety of related basic system primitives. Its API and
implementation are written in C++.
## Introduction
The design of libprocess is inspired by [Erlang](http://erlang.org),
a language that implements the
[actor model](http://en.wikipedia.org/wiki/Actor_model).
As the name already suggests, one of the libprocess core concepts is a
[Process](#process). This is a single threaded, independent actor which
communicates with other processes, locally and remotely, by sending and receiving [HTTP requests and responses](#http).
At a higher level, functional composition of processes is facilitated using [futures and promises](#futures-and-promises).
## Overview
### Table of Contents
* [Processes and the Asynchronous Pimpl Pattern](#processes)
* [Futures and Promises](#futures-and-promises)
* [HTTP](#http)
* [Clock Management and Timeouts](#clock)
* [Miscellaneous Primitives](#miscellaneous-primitives)
* [Optimized Run Queue and Event Queue](#optimized-run-queue-event-queue)
---
## <a name="processes"></a> Processes and the Asynchronous Pimpl Pattern
A `process` is an actor, effectively a cross between a thread and an object.
Creating/spawning a process is very cheap (no actual thread gets
created, and no thread stack gets allocated).
Each process has a queue of incoming events that it processes one at a
time.
Processes provide execution contexts (only one thread executing within
a process at a time so no need for per process synchronization).
<!---
~~~{.cpp}
using namespace process;
class MyProcess : public Process<MyProcess> {};
int main(int argc, char** argv)
{
MyProcess process;
spawn(process);
terminate(process);
wait(process);
return 0;
}
~~~
---->
### `delay`
`delay` instead of [dispatching](#dispatch) for execution right away, it allows it to be scheduled after a certain time duration.
### `dispatch`
`dispatch` schedules a method for asynchronous execution.
<!---
~~~{.cpp}
using namespace process;
class QueueProcess : public Process<QueueProcess>
{
public:
void enqueue(int i) { this->i = i; }
int dequeue() { return this->i; }
private:
int i;
};
int main(int argc, char** argv)
{
QueueProcess process;
spawn(process);
dispatch(process, &QueueProcess::enqueue, 42);
dispatch(process, &QueueProcess::enqueue, 43);
...;
}
~~~
---->
### `defer`
Objects like `Future` allow attaching callbacks that get executed
_synchronously_ on certain events, such as the completion of a future
(e.g., `Future::then` and `Future::onReady`) or the failure of a
future (e.g., `Future::onFailed`). It's usually desireable, however, to
execute these callbacks _asynchronously_, and `defer` provides a mechanism
to do so.
`defer` is similar to [`dispatch`](#dispatch), but rather than
enqueing the execution of a method or function on the specified
process immediately (i.e., synchronously), it returns a `Deferred`,
which is a callable object that only after getting _invoked_ will
dispatch the method or function on the specified process. Said another
way, using `defer` is a way to _defer_ a `dispatch`.
As an example, consider the following function, which spawns a process
and registers two callbacks on it, one using `defer` and another
without it:
~~~{.cpp}
using namespace process;
void foo()
{
ProcessBase process;
spawn(process);
Deferred<void(int)> deferred = defer(
process,
[](int i) {
// Invoked _asynchronously_ using `process` as the
// execution context.
});
Promise<int> promise;
promise.future().then(deferred);
promise.future().then([](int i) {
// Invoked synchronously from the execution context of
// the thread that completes the future!
});
// Executes both callbacks synchronously, which _dispatches_
// the deferred lambda to run asynchronously in the execution
// context of `process` but invokes the other lambda immediately.
promise.set(42);
terminate(process);
}
~~~
As another example, consider this excerpt from the Mesos project's
`src/master/master.cpp`:
~~~{.cpp}
// Start contending to be a leading master and detecting the current leader.
// NOTE: `.onAny` passes the relevant future to its callback as a parameter, and
// `lambda::_1` facilitates this when using `defer`.
contender->contend()
.onAny(defer(self(), &Master::contended, lambda::_1));
~~~
Why use `defer` in this context rather than just executing
`Master::detected` synchronously? To answer this, we need to remember
that when the promise associated with the future returned from
`contender->contend()` is completed that will synchronously invoke all
registered callbacks (i.e., the `Future::onAny` one in the example
above), _which may be in a different process_! Without using `defer`
the process responsible for executing `contender->contend()` will
potentially cause `&Master::contended` to get executed simultaneously
(i.e., on a different thread) than the `Master` process! This creates
the potential for a data race in which two threads access members of
`Master` concurrently. Instead, using `defer` (with `self()`) will
dispatch the method _back_ to the `Master` process to be executed at a
later point in time within the single-threaded execution context of
the `Master`. Using `defer` here precisely allows us to capture these
semantics.
A natural question that folks often ask is whether or not we ever
_don't_ want to use `defer(self(), ...)`, or even just 'defer`. In
some circumstances, you actually don't need to defer back to your own
process, but you often want to defer. A good example of that is
handling HTTP requests. Consider this example:
~~~{.cpp}
using namespace process;
using std::string;
class HttpProcess : public Process<HttpProcess>
{
public:
virtual void initialize()
{
route("/route", None(), [](const http::Request& request) {
return functionWhichReturnsAFutureOfString()
.then(defer(self(), [](const string& s) {
return http::OK("String returned in body: " + s);
}));
});
}
};
~~~
Now, while this is totally legal and correct code, the callback
executed after `functionWhichReturnsAFutureOfString` is completed
_does not_ need to be executed within the execution context of
`HttpProcess` because it doesn't require any state from `HttpProcess`!
In this case, rather than forcing the execution of the callback within
the execution context of `HttpProcess`, which will block other
callbacks _that must_ be executed by `HttpProcess`, we can simply just
run this lambda using an execution context that libprocess can pick
for us (from a pool of threads). We do so by removing `self()` as the
first argument to `defer`:
~~~{.cpp}
using namespace process;
using std::string;
class HttpProcess : public Process<HttpProcess>
{
public:
virtual void initialize()
{
route("/route", None(), [](const http::Request& request) {
return functionWhichReturnsAFutureOfString()
.then(defer([](const string& s) {
return http::OK("String returned in body: " + s);
}));
});
}
};
~~~
_Note that even in this example_ we still want to use `defer`! Why?
Because otherwise we are blocking the execution context (i.e.,
process, thread, etc) that is _completing_ the future because it is
synchronously executing the callbacks! Instead, we want to let the
callback get executed asynchronously by using `defer.`
Let's construct a simple example that illustrates a problem that can
be introduced by omitting `defer` from callback registrations. We'll
define a method for our `HttpProcess` class which accepts an input
string and returns a future via an asyncronous method called
`asyncStoreData`:
~~~{.cpp}
using namespace process;
using std::string;
class HttpProcess : public Process<HttpProcess>
{
public:
// Returns the number of bytes stored.
Future<int> inputHandler(const string input);
protected:
// Returns the number of bytes stored.
Future<int> asyncStoreData(const string input);
int storedCount;
}
Future<int> HttpProcess::inputHandler(const string input)
{
LOG(INFO) << "HttpProcess received input: " << input;
return asyncStoreData(input)
.then([this](int bytes) -> Future<int> {
this->storedCount += bytes;
LOG(INFO) << "Successfully stored input. "
<< "Total bytes stored so far: " << this->storedCount;
return bytes;
});
}
~~~
When the callback is registered on the `Future<int>` returned by
`asyncStoreData`, a lambda is passed directly to `then`. This means
that the lambda will be executed in whatever execution context
eventually fulfills the future. If the future is fulfilled in a
different execution context (i.e., inside a different libprocess
process), then it's possible that the instance of `HttpProcess` that
originally invoked `inputHandler` will have been destroyed, making
`this` a dangling pointer. In order to avoid this possibility, the
callback should be registered as follows:
~~~{.cpp}
return asyncStoreData(input)
.then(defer(self(), [this](int bytes) -> Future<int> {
...
}));
~~~
The lambda is then guaranteed to execute within the execution context
of the current process, and we know that `this` will still be a valid
pointer. We should write libprocess code that makes no assumptions
about the execution context in which a given future is fulfilled. Even
if we can verify that a future will be fulfilled in the current
process, registering a callback without `defer` makes the code more
fragile by allowing the possibility that another contributor will make
changes without considering the impact of those changes on the
registered callbacks' execution contexts. Thus, `defer` should always
be used. We offer the following rule to determine which form of
`defer` should be used in a given situation:
* If the callback being registered accesses the state of a process,
then it should be registered using `defer(pid, callback)`, where
`pid` is the PID of the process whose state is being accessed.
* If the callback doesn't access any process state, or only makes use
of process variables that are captured _by value_ so that the
context of the process is not directly accessed when the callback is
executed, then it can be run in an arbitrary execution context
chosen by libprocess, and it should be registered using
`defer(callback)`.
### `ID`
Generates a unique identifier string given a prefix. This is used to
provide `PID` names.
### `PID`
A `PID` provides a level of indirection for naming a process without
having an actual reference (pointer) to it (necessary for remote
processes).
<!---
~~~{.cpp}
using namespace process;
int main(int argc, char** argv)
{
QueueProcess process;
spawn(process);
PID<QueueProcess> pid = process.self();
dispatch(pid, &QueueProcess:enqueue, 42);
terminate(pid);
wait(pid);
return 0;
}
~~~
---->
---
## <a name="futures-and-promises"></a> Futures and Promises
The `Future` and `Promise` primitives are used to enable
programmers to write asynchronous, non-blocking, and highly
concurrent software.
A `Future` acts as the read-side of a result which might be
computed asynchronously. A `Promise`, on the other hand, acts
as the write-side "container". We'll use some examples to
explain the concepts.
First, you can construct a `Promise` of a particular type by
doing the following:
~~~{.cpp}
using namespace process;
int main(int argc, char** argv)
{
Promise<int> promise;
return 0;
}
~~~
A `Promise` is not copyable or assignable, in order to encourage
strict ownership rules between processes (i.e., it's hard to
reason about multiple actors concurrently trying to complete a
`Promise`, even if it's safe to do so concurrently).
You can get a `Future` from a `Promise` using the
`Promise::future()` method:
~~~{.cpp}
using namespace process;
int main(int argc, char** argv)
{
Promise<int> promise;
Future<int> future = promise.future();
return 0;
}
~~~
Note that the templated type of the future must be the exact
same as the promise: you cannot create a covariant or
contravariant future. Unlike `Promise`, a `Future` can be both
copied and assigned:
~~~{.cpp}
using namespace process;
int main(int argc, char** argv)
{
Promise<int> promise;
Future<int> future = promise.future();
// You can copy a future.
Future<int> future2 = future;
// You can also assign a future (NOTE: this future will never
// complete because the Promise goes out of scope, but the
// Future is still valid and can be used normally.)
future = Promise<int>().future();
return 0;
}
~~~
The result encapsulated in the `Future`/`Promise` can be in one
of four states: `PENDING`, `READY`, `FAILED`, `DISCARDED`. When
a `Promise` is first created the result is `PENDING`. When you
complete a `Promise` using the `Promise::set()` method the
result becomes `READY`:
~~~{.cpp}
using namespace process;
int main(int argc, char** argv)
{
Promise<int> promise;
Future<int> future = promise.future();
promise.set(42);
CHECK(future.isReady());
return 0;
}
~~~
> NOTE: `CHECK` is a macro from `gtest` which acts like an
> `assert` but prints a stack trace and does better signal
> management. In addition to `CHECK`, we've also created
> wrapper macros `CHECK_PENDING`, `CHECK_READY`, `CHECK_FAILED`,
> `CHECK_DISCARDED` which enables you to more concisely do
> things like `CHECK_READY(future)` in your code. We'll use
> those throughout the rest of this guide.
TODO(benh):
* Using `Future` and `Promise` between actors, i.e., `dispatch` returning a `Future`
* `Promise::fail()`
* `Promise::discard()` and `Future::discard()`
* `Future::onReady()`, `Future::onFailed()`, `Future::onDiscarded()`
* `Future::then()`, `Future::repair()`, `Future::after`
* `defer`
* `Future::await()`
<!--
#### `Future::then`
`Future::then` allows to invoke callbacks once a future is completed.
~~~{.cpp}
using namespace process;
int main(int argc, char** argv)
{
...;
Future<int> i = dispatch(process, &QueueProcess<int>::dequeue);
dispatch(process, &QueueProcess<int>::enqueue, 42);
i.then([] (int i) {
// Use 'i'.
});
...;
}
~~~
### `Promise`
A `promise` is an object that can fulfill a [futures](#future), i.e. assign a result value to it.
~~~{.cpp}
using namespace process;
template <typename T>
class QueueProcess : public Process<QueueProcess<T>>
{
public:
Future<T> dequeue()
{
return promise.future();
}
void enqueue(T t)
{
promise.set(t);
}
private:
Promise<T> promise;
};
int main(int argc, char** argv)
{
...;
Future<int> i = dispatch(process, &QueueProcess<int>::dequeue);
dispatch(process, &QueueProcess<int>::enqueue, 42);
i.await();
...;
}
~~~
-->
## <a name="http"></a> HTTP
libprocess provides facilities for communicating between actors via HTTP
messages. With the advent of the HTTP API, HTTP is becoming the preferred mode
of communication.
### `route`
`route` installs an HTTP endpoint onto a process. Let's define a simple process
that installs an endpoint upon initialization:
~~~{.cpp}
using namespace process;
using namespace process::http;
class HttpProcess : public Process<HttpProcess>
{
protected:
virtual void initialize()
{
route("/testing", None(), [](const Request& request) {
return testing(request.query);
});
}
};
class Http
{
public:
Http() : process(new HttpProcess())
{
spawn(process.get());
}
virtual ~Http()
{
terminate(process.get());
wait(process.get());
}
Owned<HttpProcess> process;
};
~~~
Now if our program instantiates this class, we can do something like:
`$ curl localhost:1234/testing?value=42`
Note that the port at which this endpoint can be reached is the port libprocess
has bound to, which is determined by the `LIBPROCESS_PORT` environment variable.
In the case of the Mesos master or agent, this environment variable is set
according to the `--port` command-line flag.
### `get`
`get` will hit an HTTP endpoint with a GET request and return a `Future`
containing the response. We can pass it either a libprocess `UPID` or a `URL`.
Here's an example hitting the endpoint assuming we have a `UPID` named `upid`:
~~~{.cpp}
Future<Response> future = get(upid, "testing");
~~~
Or let's assume our serving process has been set up on a remote server and we
want to hit its endpoint. We'll construct a `URL` for the address and then call
`get`:
~~~{.cpp}
URL url = URL("http", "hostname", 1234, "/testing");
Future<Response> future = get(url);
~~~
### `post` and `requestDelete`
The `post` and `requestDelete` functions will similarly send POST and DELETE
requests to an HTTP endpoint. Their invocation is analogous to `get`.
### `Connection`
A `Connection` represents a connection to an HTTP server. `connect`
can be used to connect to a server, and returns a `Future` containing the
`Connection`. Let's open a connection to a server and send some requests:
~~~{.cpp}
Future<Connection> connect = connect(url);
connect.await();
Connection connection = connect.get();
Request request;
request.method = "GET";
request.url = url;
request.body = "Amazing prose goes here.";
request.keepAlive = true;
Future<Response> response = connection.send(request);
~~~
It's also worth noting that if multiple requests are sent in succession on a
`Connection`, they will be automatically pipelined.
## <a name="clock"></a> Clock Management and Timeouts
Asynchronous programs often use timeouts, e.g., because a process that initiates
an asynchronous operation wants to take action if the operation hasn't completed
within a certain time bound. To facilitate this, libprocess provides a set of
abstractions that simplify writing timeout logic. Importantly, test code has the
ability to manipulate the clock, in order to ensure that timeout logic is
exercised (without needing to block the test program until the appropriate
amount of system time has elapsed).
To invoke a function after a certain amount of time has elapsed, use `delay`:
~~~{.cpp}
using namespace process;
class DelayedProcess : public Process<DelayedProcess>
{
public:
void action(const string& name)
{
LOG(INFO) << "hello, " << name;
promise.set(Nothing());
}
Promise<Nothing> promise;
};
int main()
{
DelayedProcess process;
spawn(process);
LOG(INFO) << "Starting to wait";
delay(Seconds(5), process.self(), &DelayedProcess::action, "Neil");
AWAIT_READY(process.promise.future());
LOG(INFO) << "Done waiting";
terminate(process);
wait(process);
return 0;
}
~~~
This invokes the `action` function after (at least) five seconds of time
have elapsed. When writing unit tests for this code, blocking the test for five
seconds is undesirable. To avoid this, we can use `Clock::advance`:
~~~{.cpp}
int main()
{
DelayedProcess process;
spawn(process);
LOG(INFO) << "Starting to wait";
Clock::pause();
delay(Seconds(5), process.self(), &DelayedProcess::action, "Neil");
Clock::advance(Seconds(5));
AWAIT_READY(process.promise.future());
LOG(INFO) << "Done waiting";
terminate(process);
wait(process);
Clock::resume();
return 0;
}
~~~
## <a name="miscellaneous-primitives"></a> Miscellaneous Primitives
### `async`
Async defines a function template for asynchronously executing
function closures. It provides their results as
[futures](#futures-and-promises).
## <a name="optimized-run-queue-event-queue"></a> Optimized Run Queue and Event Queue
There are a handful of compile-time optimizations that can be
configured to improve the run queue and event queue performance. These
are currently not enabled by default as they are considered
***alpha***. These optimizations include:
* `--enable-lock-free-run-queue` (autotools) or
`-DENABLE_LOCK_FREE_RUN_QUEUE` (cmake) which enables the lock-free
run queue implementation.
* `--enable-lock-free-event-queue` (autotools) or
`-DENABLE_LOCK_FREE_EVENT_QUEUE` (cmake) which enables the lock-free
event queue implementation.
* `--enable-last-in-first-out-fixed-size-semaphore` (autotools) or
`-DENABLE_LAST_IN_FIRST_OUT_FIXED_SIZE_SEMAPHORE` (cmake) which
enables an optimized semaphore implementation.
#### Details
Both the lock-free run queue implementation and the lock-free event
queue implementation use `moodycamel::ConcurrentQueue` which can be
found [here](https://github.com/cameron314/concurrentqueue).
For the run queue we use a semaphore to block threads when there are
not any processes to run. On Linux we found that using a semaphore
from glibc (i.e., `sem_create`, `sem_wait`, `sem_post`, etc) had some
performance issues. We discuss those performance issues and how our
optimized semaphore overcomes them in more detail in
[semaphore.hpp](https://github.com/apache/mesos/blob/master/3rdparty/libprocess/src/semaphore.hpp#L191).
#### Benchmark
The benchmark that we've used to drive the run queue and event queue
performance improvements can be found in
[benchmarks.cpp](https://github.com/apache/mesos/blob/master/3rdparty/libprocess/src/tests/benchmarks.cpp#L426). You
can run the benchmark yourself by invoking `./benchmarks
--gtest_filter=ProcessTest.*ThroughputPerformance`.