#include <errno.h>
#include <ev.h>
#include <limits.h>
#include <libgen.h>
#include <netdb.h>
#include <pthread.h>
#include <signal.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <arpa/inet.h>

#include <glog/logging.h>

#include <netinet/in.h>
#include <netinet/tcp.h>

#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/uio.h>

#include <algorithm>
#include <deque>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <list>
#include <map>
#include <queue>
#include <set>
#include <sstream>
#include <stack>
#include <stdexcept>
#include <vector>

#include <boost/shared_array.hpp>

#include <process/clock.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
#include <process/executor.hpp>
#include <process/filter.hpp>
#include <process/future.hpp>
#include <process/gc.hpp>
#include <process/help.hpp>
#include <process/id.hpp>
#include <process/io.hpp>
#include <process/logging.hpp>
#include <process/mime.hpp>
#include <process/node.hpp>
#include <process/process.hpp>
#include <process/profiler.hpp>
#include <process/socket.hpp>
#include <process/statistics.hpp>
#include <process/system.hpp>
#include <process/time.hpp>
#include <process/timer.hpp>

#include <process/metrics/metrics.hpp>

#include <stout/duration.hpp>
#include <stout/foreach.hpp>
#include <stout/lambda.hpp>
#include <stout/memory.hpp> // TODO(benh): Replace shared_ptr with unique_ptr.
#include <stout/net.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/strings.hpp>
#include <stout/thread.hpp>
#include <stout/unreachable.hpp>

#include "config.hpp"
#include "decoder.hpp"
#include "encoder.hpp"
#include "gate.hpp"
#include "process_reference.hpp"
#include "synchronized.hpp"

using namespace process::metrics::internal;

using process::wait; // Necessary on some OS's to disambiguate.

using process::http::Accepted;
using process::http::BadRequest;
using process::http::InternalServerError;
using process::http::NotFound;
using process::http::OK;
using process::http::Request;
using process::http::Response;
using process::http::ServiceUnavailable;

using std::deque;
using std::find;
using std::list;
using std::map;
using std::ostream;
using std::pair;
using std::queue;
using std::set;
using std::stack;
using std::string;
using std::stringstream;
using std::vector;

namespace process {

namespace ID {

string generate(const string& prefix)
{
  static map<string, int>* prefixes = new map<string, int>();
  static synchronizable(prefixes) = SYNCHRONIZED_INITIALIZER;

  int id;
  synchronized (prefixes) {
    int& _id = (*prefixes)[prefix];
    _id += 1;
    id = _id;
  }
  return prefix + "(" + stringify(id) + ")";
}

} // namespace ID {


namespace mime {

map<string, string> types;

} // namespace mime {


// Provides a process that manages sending HTTP responses so as to
// satisfy HTTP/1.1 pipelining. Each request should either enqueue a
// response, or ask the proxy to handle a future response. The process
// is responsible for making sure the responses are sent in the same
// order as the requests. Note that we use a 'Socket' in order to keep
// the underyling file descriptor from getting closed while there
// might still be outstanding responses even though the client might
// have closed the connection (see more discussion in
// SocketManger::close and SocketManager::proxy).
class HttpProxy : public Process<HttpProxy>
{
public:
  explicit HttpProxy(const Socket& _socket);
  virtual ~HttpProxy();

  // Enqueues the response to be sent once all previously enqueued
  // responses have been processed (e.g., waited for and sent).
  void enqueue(const Response& response, const Request& request);

  // Enqueues a future to a response that will get waited on (up to
  // some timeout) and then sent once all previously enqueued
  // responses have been processed (e.g., waited for and sent).
  void handle(Future<Response>* future, const Request& request);

private:
  // Starts "waiting" on the next available future response.
  void next();

  // Invoked once a future response has been satisfied.
  void waited(const Future<Response>& future);

  // Demuxes and handles a response.
  bool process(const Future<Response>& future, const Request& request);

  // Handles stream (i.e., pipe) based responses.
  void stream(const Future<short>& poll, const Request& request);

  Socket socket; // Wrap the socket to keep it from getting closed.

  // Describes a queue "item" that wraps the future to the response
  // and the original request.
  // The original request contains needed information such as what encodings
  // are acceptable and whether to persist the connection.
  struct Item
  {
    Item(const Request& _request, Future<Response>* _future)
      : request(_request), future(_future) {}

    ~Item()
    {
      delete future;
    }

    // Helper for cleaning up a response (i.e., closing any open pipes
    // in the event Response::type is PIPE).
    static void cleanup(const Response& response)
    {
      if (response.type == Response::PIPE) {
        os::close(response.pipe);
      }
    }

    const Request request; // Make a copy.
    Future<Response>* future;
  };

  queue<Item*> items;

  Option<int> pipe; // Current pipe, if streaming.
};


// Helper for creating routes without a process.
// TODO(benh): Move this into route.hpp.
class Route
{
public:
  Route(const string& name,
        const Option<string>& help,
        const lambda::function<Future<Response>(const Request&)>& handler)
  {
    process = new RouteProcess(name, help, handler);
    spawn(process);
  }

  ~Route()
  {
    terminate(process);
    wait(process);
  }

private:
  class RouteProcess : public Process<RouteProcess>
  {
  public:
    RouteProcess(
        const string& name,
        const Option<string>& _help,
        const lambda::function<Future<Response>(const Request&)>& _handler)
      : ProcessBase(strings::remove(name, "/", strings::PREFIX)),
        help(_help),
        handler(_handler) {}

  protected:
    virtual void initialize()
    {
      route("/", help, &RouteProcess::handle);
    }

    Future<Response> handle(const Request& request)
    {
      return handler(request);
    }

    const Option<string> help;
    const lambda::function<Future<Response>(const Request&)> handler;
  };

  RouteProcess* process;
};


class SocketManager
{
public:
  SocketManager();
  ~SocketManager();

  Socket accepted(int s);

  void link(ProcessBase* process, const UPID& to);

  PID<HttpProxy> proxy(const Socket& socket);

  void send(Encoder* encoder, bool persist);
  void send(const Response& response,
            const Request& request,
            const Socket& socket);
  void send(Message* message);

  Encoder* next(int s);

  void close(int s);

  void exited(const Node& node);
  void exited(ProcessBase* process);

private:
  // TODO(bmahler): Leverage a bidirectional multimap instead, or
  // hide the complexity of manipulating 'links' through methods.
  struct
  {
    // For links, we maintain a bidirectional mapping between the
    // "linkers" (Processes) and the "linkees" (remote / local UPIDs).
    // For remote nodes, we also need a mapping to the linkees on the
    // node, because socket closure only notifies at the node level.
    hashmap<UPID, hashset<ProcessBase*>> linkers;
    hashmap<ProcessBase*, hashset<UPID>> linkees;
    hashmap<Node, hashset<UPID>> remotes;
  } links;

  // Collection of all actice sockets.
  map<int, Socket> sockets;

  // Collection of sockets that should be disposed when they are
  // finished being used (e.g., when there is no more data to send on
  // them).
  set<int> dispose;

  // Map from socket to node (ip, port).
  map<int, Node> nodes;

  // Maps from node (ip, port) to temporary sockets (i.e., they will
  // get closed once there is no more data to send on them).
  map<Node, int> temps;

  // Maps from node (ip, port) to persistent sockets (i.e., they will
  // remain open even if there is no more data to send on them).  We
  // distinguish these from the 'temps' collection so we can tell when
  // a persistant socket has been lost (and thus generate
  // ExitedEvents).
  map<Node, int> persists;

  // Map from socket to outgoing queue.
  map<int, queue<Encoder*> > outgoing;

  // HTTP proxies.
  map<int, HttpProxy*> proxies;

  // Protects instance variables.
  synchronizable(this);
};


class ProcessManager
{
public:
  explicit ProcessManager(const string& delegate);
  ~ProcessManager();

  ProcessReference use(const UPID& pid);

  bool handle(
      const Socket& socket,
      Request* request);

  bool deliver(
      ProcessBase* receiver,
      Event* event,
      ProcessBase* sender = NULL);

  bool deliver(
      const UPID& to,
      Event* event,
      ProcessBase* sender = NULL);

  UPID spawn(ProcessBase* process, bool manage);
  void resume(ProcessBase* process);
  void cleanup(ProcessBase* process);
  void link(ProcessBase* process, const UPID& to);
  void terminate(const UPID& pid, bool inject, ProcessBase* sender = NULL);
  bool wait(const UPID& pid);

  void enqueue(ProcessBase* process);
  ProcessBase* dequeue();

  void settle();

  // The /__processes__ route.
  Future<Response> __processes__(const Request&);

private:
  // Delegate process name to receive root HTTP requests.
  const string delegate;

  // Map of all local spawned and running processes.
  map<string, ProcessBase*> processes;
  synchronizable(processes);

  // Gates for waiting threads (protected by synchronizable(processes)).
  map<ProcessBase*, Gate*> gates;

  // Queue of runnable processes (implemented using list).
  list<ProcessBase*> runq;
  synchronizable(runq);

  // Number of running processes, to support Clock::settle operation.
  int running;
};


// Help strings.
const string Logging::TOGGLE_HELP = HELP(
    TLDR(
        "Sets the logging verbosity level for a specified duration."),
    USAGE(
        "/logging/toggle?level=VALUE&duration=VALUE"),
    DESCRIPTION(
        "The libprocess library uses [glog][glog] for logging. The library",
        "only uses verbose logging which means nothing will be output unless",
        "the verbosity level is set (by default it's 0, libprocess uses"
        "levels 1, 2, and 3).",
        "",
        "**NOTE:** If your application uses glog this will also affect",
        "your verbose logging.",
        "",
        "Required query parameters:",
        "",
        ">        level=VALUE          Verbosity level (e.g., 1, 2, 3)",
        ">        duration=VALUE       Duration to keep verbosity level",
        ">                             toggled (e.g., 10secs, 15mins, etc.)"),
    REFERENCES(
        "[glog]: https://code.google.com/p/google-glog"));


const string Profiler::START_HELP = HELP(
    TLDR(
        "Starts profiling ..."),
    USAGE(
        "/profiler/start..."),
    DESCRIPTION(
        "...",
        "",
        "Query parameters:",
        "",
        ">        param=VALUE          Some description here"));


const string Profiler::STOP_HELP = HELP(
    TLDR(
        "Stops profiling ..."),
    USAGE(
        "/profiler/stop..."),
    DESCRIPTION(
        "...",
        "",
        "Query parameters:",
        "",
        ">        param=VALUE          Some description here"));


// Unique id that can be assigned to each process.
static uint32_t __id__ = 0;

// Local server socket.
static int __s__ = -1;

// Local IP address.
static uint32_t __ip__ = 0;

// Local port.
static uint16_t __port__ = 0;

// Active SocketManager (eventually will probably be thread-local).
static SocketManager* socket_manager = NULL;

// Active ProcessManager (eventually will probably be thread-local).
static ProcessManager* process_manager = NULL;

// Event loop.
static struct ev_loop* loop = NULL;

// Asynchronous watcher for interrupting loop.
static ev_async async_watcher;

// Watcher for timeouts.
static ev_timer timeouts_watcher;

// Server watcher for accepting connections.
static ev_io server_watcher;

// Queue of I/O watchers to be asynchronously added to the event loop
// (protected by 'watchers' below).
// TODO(benh): Replace this queue with functions that we put in
// 'functions' below that perform the ev_io_start themselves.
static queue<ev_io*>* watchers = new queue<ev_io*>();
static synchronizable(watchers) = SYNCHRONIZED_INITIALIZER;

// Queue of functions to be invoked asynchronously within the vent
// loop (protected by 'watchers' below).
static queue<lambda::function<void(void)> >* functions =
  new queue<lambda::function<void(void)> >();

// We store the timers in a map of lists indexed by the timeout of the
// timer so that we can have two timers that have the same timeout. We
// exploit that the map is SORTED!
static map<Time, list<Timer> >* timeouts = new map<Time, list<Timer> >();
static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE;

// For supporting Clock::settle(), true if timers have been removed
// from 'timeouts' but may not have been executed yet. Protected by
// the timeouts lock. This is only used when the clock is paused.
static bool pending_timers = false;

// Flag to indicate whether or to update the timer on async interrupt.
static bool update_timer = false;

// Scheduling gate that threads wait at when there is nothing to run.
static Gate* gate = new Gate();

// Filter. Synchronized support for using the filterer needs to be
// recursive incase a filterer wants to do anything fancy (which is
// possible and likely given that filters will get used for testing).
static Filter* filterer = NULL;
static synchronizable(filterer) = SYNCHRONIZED_INITIALIZER_RECURSIVE;

// Global garbage collector.
PID<GarbageCollector> gc;

// Global help.
PID<Help> help;

// Per thread process pointer.
ThreadLocal<ProcessBase>* _process_ = new ThreadLocal<ProcessBase>();

// Per thread executor pointer.
ThreadLocal<Executor>* _executor_ = new ThreadLocal<Executor>();

// TODO(dhamon): Reintroduce this when it is plumbed through to Statistics.
// const Duration LIBPROCESS_STATISTICS_WINDOW = Days(1);


// We namespace the clock related variables to keep them well
// named. In the future we'll probably want to associate a clock with
// a specific ProcessManager/SocketManager instance pair, so this will
// likely change.
namespace clock {

map<ProcessBase*, Time>* currents = new map<ProcessBase*, Time>();

// TODO(dhamon): These static non-POD instances should be replaced by pointers
// or functions.
Time initial = Time::epoch();
Time current = Time::epoch();

Duration advanced = Duration::zero();

bool paused = false;

} // namespace clock {


Time Clock::now()
{
  return now(__process__);
}


Time Clock::now(ProcessBase* process)
{
  synchronized (timeouts) {
    if (Clock::paused()) {
      if (process != NULL) {
        if (clock::currents->count(process) != 0) {
          return (*clock::currents)[process];
        } else {
          return (*clock::currents)[process] = clock::initial;
        }
      } else {
        return clock::current;
      }
    }
  }

  // TODO(benh): Versus ev_now()?
  double d = ev_time();
  Try<Time> time = Time::create(d); // Compensates for clock::advanced.

  // TODO(xujyan): Move CHECK_SOME to libprocess and add CHECK_SOME
  // here.
  if (time.isError()) {
    LOG(FATAL) << "Failed to create a Time from " << d << ": "
               << time.error();
  }
  return time.get();
}


void Clock::pause()
{
  process::initialize(); // To make sure the libev watchers are ready.

  synchronized (timeouts) {
    if (!clock::paused) {
      clock::initial = clock::current = now();
      clock::paused = true;
      VLOG(2) << "Clock paused at " << clock::initial;
    }
  }

  // Note that after pausing the clock an existing libev timer might
  // still fire (invoking handle_timeout), but since paused == true no
  // "time" will actually have passed, so no timer will actually fire.
}


bool Clock::paused()
{
  return clock::paused;
}


void Clock::resume()
{
  process::initialize(); // To make sure the libev watchers are ready.

  synchronized (timeouts) {
    if (clock::paused) {
      VLOG(2) << "Clock resumed at " << clock::current;
      clock::paused = false;
      clock::currents->clear();
      update_timer = true;
      ev_async_send(loop, &async_watcher);
    }
  }
}


void Clock::advance(const Duration& duration)
{
  synchronized (timeouts) {
    if (clock::paused) {
      clock::advanced += duration;
      clock::current += duration;
      VLOG(2) << "Clock advanced ("  << duration << ") to " << clock::current;
      if (!update_timer) {
        update_timer = true;
        ev_async_send(loop, &async_watcher);
      }
    }
  }
}


void Clock::advance(ProcessBase* process, const Duration& duration)
{
  synchronized (timeouts) {
    if (clock::paused) {
      Time current = now(process);
      current += duration;
      (*clock::currents)[process] = current;
      VLOG(2) << "Clock of " << process->self() << " advanced (" << duration
              << ") to " << current;
    }
  }
}


void Clock::update(const Time& time)
{
  synchronized (timeouts) {
    if (clock::paused) {
      if (clock::current < time) {
        clock::advanced += (time - clock::current);
        clock::current = Time(time);
        VLOG(2) << "Clock updated to " << clock::current;
        if (!update_timer) {
          update_timer = true;
          ev_async_send(loop, &async_watcher);
        }
      }
    }
  }
}


void Clock::update(ProcessBase* process, const Time& time)
{
  synchronized (timeouts) {
    if (clock::paused) {
      if (now(process) < time) {
        VLOG(2) << "Clock of " << process->self() << " updated to " << time;
        (*clock::currents)[process] = Time(time);
      }
    }
  }
}


void Clock::order(ProcessBase* from, ProcessBase* to)
{
  update(to, now(from));
}


void Clock::settle()
{
  CHECK(clock::paused); // TODO(benh): Consider returning a bool instead.
  process_manager->settle();
}


Try<Time> Time::create(double seconds)
{
  Try<Duration> duration = Duration::create(seconds);
  if (duration.isSome()) {
    // In production code, clock::advanced will always be zero!
    return Time(duration.get() + clock::advanced);
  } else {
    return Error("Argument too large for Time: " + duration.error());
  }
}


static Message* encode(const UPID& from,
                       const UPID& to,
                       const string& name,
                       const string& data = "")
{
  Message* message = new Message();
  message->from = from;
  message->to = to;
  message->name = name;
  message->body = data;
  return message;
}


static void transport(Message* message, ProcessBase* sender = NULL)
{
  if (message->to.ip == __ip__ && message->to.port == __port__) {
    // Local message.
    process_manager->deliver(message->to, new MessageEvent(message), sender);
  } else {
    // Remote message.
    socket_manager->send(message);
  }
}


static bool libprocess(Request* request)
{
  return
    (request->method == "POST" &&
     request->headers.contains("User-Agent") &&
     request->headers["User-Agent"].find("libprocess/") == 0) ||
    (request->method == "POST" &&
     request->headers.contains("Libprocess-From"));
}


static Message* parse(Request* request)
{
  // TODO(benh): Do better error handling (to deal with a malformed
  // libprocess message, malicious or otherwise).

  // First try and determine 'from'.
  Option<UPID> from = None();

  if (request->headers.contains("Libprocess-From")) {
    from = UPID(strings::trim(request->headers["Libprocess-From"]));
  } else {
    // Try and get 'from' from the User-Agent.
    const string& agent = request->headers["User-Agent"];
    const string& identifier = "libprocess/";
    size_t index = agent.find(identifier);
    if (index != string::npos) {
      from = UPID(agent.substr(index + identifier.size(), agent.size()));
    }
  }

  if (from.isNone()) {
    return NULL;
  }

  // Now determine 'to'.
  size_t index = request->path.find('/', 1);
  index = index != string::npos ? index - 1 : string::npos;

  // Decode possible percent-encoded 'to'.
  Try<string> decode = http::decode(request->path.substr(1, index));

  if (decode.isError()) {
    VLOG(2) << "Failed to decode URL path: " << decode.get();
    return NULL;
  }

  const UPID to(decode.get(), __ip__, __port__);

  // And now determine 'name'.
  index = index != string::npos ? index + 2: request->path.size();
  const string& name = request->path.substr(index);

  VLOG(2) << "Parsed message name '" << name
          << "' for " << to << " from " << from.get();

  Message* message = new Message();
  message->name = name;
  message->from = from.get();
  message->to = to;
  message->body = request->body;

  return message;
}

// Wrapper around function we want to run in the event loop.
template <typename T>
void _run_in_event_loop(
    const lambda::function<Future<T>(void)>& f,
    const Owned<Promise<T> >& promise)
{
  // Don't bother running the function if the future has been discarded.
  if (promise->future().hasDiscard()) {
    promise->discard();
  } else {
    promise->set(f());
  }
}


// Helper for running a function in the event loop.
template <typename T>
Future<T> run_in_event_loop(const lambda::function<Future<T>(void)>& f)
{
  Owned<Promise<T> > promise(new Promise<T>());

  Future<T> future = promise->future();

  // Enqueue the function.
  synchronized (watchers) {
    functions->push(lambda::bind(&_run_in_event_loop<T>, f, promise));
  }

  // Interrupt the loop.
  ev_async_send(loop, &async_watcher);

  return future;
}


void handle_async(struct ev_loop* loop, ev_async* _, int revents)
{
  synchronized (watchers) {
    // Start all the new I/O watchers.
    while (!watchers->empty()) {
      ev_io* watcher = watchers->front();
      watchers->pop();
      ev_io_start(loop, watcher);
    }

    while (!functions->empty()) {
      (functions->front())();
      functions->pop();
    }
  }

  synchronized (timeouts) {
    if (update_timer) {
      if (!timeouts->empty()) {
        // Determine when the next timer should fire.
        timeouts_watcher.repeat =
          (timeouts->begin()->first - Clock::now()).secs();

        if (timeouts_watcher.repeat <= 0) {
          // Feed the event now!
          timeouts_watcher.repeat = 0;
          ev_timer_again(loop, &timeouts_watcher);
          ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
        } else {
          // Don't fire the timer if the clock is paused since we
          // don't want time to advance (instead a call to
          // clock::advance() will handle the timer).
          if (Clock::paused() && timeouts_watcher.repeat > 0) {
            timeouts_watcher.repeat = 0;
          }

          ev_timer_again(loop, &timeouts_watcher);
        }
      }

      update_timer = false;
    }
  }
}


void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
{
  list<Timer> timedout;

  synchronized (timeouts) {
    Time now = Clock::now();

    VLOG(3) << "Handling timeouts up to " << now;

    foreachkey (const Time& timeout, *timeouts) {
      if (timeout > now) {
        break;
      }

      VLOG(3) << "Have timeout(s) at " << timeout;

      // Record that we have pending timers to execute so the
      // Clock::settle() operation can wait until we're done.
      pending_timers = true;

      foreach (const Timer& timer, (*timeouts)[timeout]) {
        timedout.push_back(timer);
      }
    }

    // Now erase the range of timeouts that timed out.
    timeouts->erase(timeouts->begin(), timeouts->upper_bound(now));

    // Okay, so the timeout for the next timer should not have fired.
    CHECK(timeouts->empty() || (timeouts->begin()->first > now));

    // Update the timer as necessary.
    if (!timeouts->empty()) {
      // Determine when the next timer should fire.
      timeouts_watcher.repeat =
        (timeouts->begin()->first - Clock::now()).secs();

      if (timeouts_watcher.repeat <= 0) {
        // Feed the event now!
        timeouts_watcher.repeat = 0;
        ev_timer_again(loop, &timeouts_watcher);
        ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
      } else {
        // Don't fire the timer if the clock is paused since we don't
        // want time to advance (instead a call to Clock::advance()
        // will handle the timer).
        if (Clock::paused() && timeouts_watcher.repeat > 0) {
          timeouts_watcher.repeat = 0;
        }

        ev_timer_again(loop, &timeouts_watcher);
      }
    }

    update_timer = false; // Since we might have a queued update_timer.
  }

  // Update current time of process (if it's present/valid). It might
  // be necessary to actually add some more synchronization around
  // this so that, for example, pausing and resuming the clock doesn't
  // cause some processes to get thier current times updated and
  // others not. Since ProcessManager::use acquires the 'processes'
  // lock we had to move this out of the synchronized (timeouts) above
  // since there was a deadlock with acquring 'processes' then
  // 'timeouts' (reverse order) in ProcessManager::cleanup. Note that
  // current time may be greater than the timeout if a local message
  // was received (and happens-before kicks in).
  if (Clock::paused()) {
    foreach (const Timer& timer, timedout) {
      if (ProcessReference process = process_manager->use(timer.creator())) {
        Clock::update(process, timer.timeout().time());
      }
    }
  }

  // Invoke the timers that timed out (TODO(benh): Do this
  // asynchronously so that we don't tie up the event thread!).
  foreach (const Timer& timer, timedout) {
    timer();
  }

  // Mark ourselves as done executing the timers since it's now safe
  // for a call to Clock::settle() to check if there will be any
  // future timeouts reached.
  synchronized (timeouts) {
    pending_timers = false;
  }
}


void recv_data(struct ev_loop* loop, ev_io* watcher, int revents)
{
  DataDecoder* decoder = (DataDecoder*) watcher->data;

  int s = watcher->fd;

  while (true) {
    const ssize_t size = 80 * 1024;
    ssize_t length = 0;

    char data[size];

    length = recv(s, data, size, 0);

    if (length < 0 && (errno == EINTR)) {
      // Interrupted, try again now.
      continue;
    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
      // Might block, try again later.
      break;
    } else if (length <= 0) {
      // Socket error or closed.
      if (length < 0) {
        const char* error = strerror(errno);
        VLOG(1) << "Socket error while receiving: " << error;
      } else {
        VLOG(2) << "Socket closed while receiving";
      }
      socket_manager->close(s);
      delete decoder;
      ev_io_stop(loop, watcher);
      delete watcher;
      break;
    } else {
      CHECK(length > 0);

      // Decode as much of the data as possible into HTTP requests.
      const deque<Request*>& requests = decoder->decode(data, length);

      if (!requests.empty()) {
        foreach (Request* request, requests) {
          process_manager->handle(decoder->socket(), request);
        }
      } else if (requests.empty() && decoder->failed()) {
        VLOG(1) << "Decoder error while receiving";
        socket_manager->close(s);
        delete decoder;
        ev_io_stop(loop, watcher);
        delete watcher;
        break;
      }
    }
  }
}


// A variant of 'recv_data' that doesn't do anything with the
// data. Used by sockets created via SocketManager::link as well as
// SocketManager::send(Message) where we don't care about the data
// received we mostly just want to know when the socket has been
// closed.
void ignore_data(struct ev_loop* loop, ev_io* watcher, int revents)
{
  Socket* socket = (Socket*) watcher->data;

  int s = watcher->fd;

  while (true) {
    const ssize_t size = 80 * 1024;
    ssize_t length = 0;

    char data[size];

    length = recv(s, data, size, 0);

    if (length < 0 && (errno == EINTR)) {
      // Interrupted, try again now.
      continue;
    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
      // Might block, try again later.
      break;
    } else if (length <= 0) {
      // Socket error or closed.
      if (length < 0) {
        const char* error = strerror(errno);
        VLOG(1) << "Socket error while receiving: " << error;
      } else {
        VLOG(2) << "Socket closed while receiving";
      }
      socket_manager->close(s);
      ev_io_stop(loop, watcher);
      delete socket;
      delete watcher;
      break;
    } else {
      VLOG(2) << "Ignoring " << length << " bytes of data received "
              << "on socket used only for sending";
    }
  }
}


void send_data(struct ev_loop* loop, ev_io* watcher, int revents)
{
  DataEncoder* encoder = (DataEncoder*) watcher->data;

  int s = watcher->fd;

  while (true) {
    const void* data;
    size_t size;

    data = encoder->next(&size);
    CHECK(size > 0);

    ssize_t length = send(s, data, size, MSG_NOSIGNAL);

    if (length < 0 && (errno == EINTR)) {
      // Interrupted, try again now.
      encoder->backup(size);
      continue;
    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
      // Might block, try again later.
      encoder->backup(size);
      break;
    } else if (length <= 0) {
      // Socket error or closed.
      if (length < 0) {
        const char* error = strerror(errno);
        VLOG(1) << "Socket error while sending: " << error;
      } else {
        VLOG(1) << "Socket closed while sending";
      }
      socket_manager->close(s);
      delete encoder;
      ev_io_stop(loop, watcher);
      delete watcher;
      break;
    } else {
      CHECK(length > 0);

      // Update the encoder with the amount sent.
      encoder->backup(size - length);

      // See if there is any more of the message to send.
      if (encoder->remaining() == 0) {
        delete encoder;

        // Stop this watcher for now.
        ev_io_stop(loop, watcher);

        // Check for more stuff to send on socket.
        Encoder* next = socket_manager->next(s);
        if (next != NULL) {
          watcher->data = next;
          ev_io_init(watcher, next->sender(), s, EV_WRITE);
          ev_io_start(loop, watcher);
        } else {
          // Nothing more to send right now, clean up.
          delete watcher;
        }
        break;
      }
    }
  }
}


void send_file(struct ev_loop* loop, ev_io* watcher, int revents)
{
  FileEncoder* encoder = (FileEncoder*) watcher->data;

  int s = watcher->fd;

  while (true) {
    int fd;
    off_t offset;
    size_t size;

    fd = encoder->next(&offset, &size);
    CHECK(size > 0);

    ssize_t length = os::sendfile(s, fd, offset, size);

    if (length < 0 && (errno == EINTR)) {
      // Interrupted, try again now.
      encoder->backup(size);
      continue;
    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
      // Might block, try again later.
      encoder->backup(size);
      break;
    } else if (length <= 0) {
      // Socket error or closed.
      if (length < 0) {
        const char* error = strerror(errno);
        VLOG(1) << "Socket error while sending: " << error;
      } else {
        VLOG(1) << "Socket closed while sending";
      }
      socket_manager->close(s);
      delete encoder;
      ev_io_stop(loop, watcher);
      delete watcher;
      break;
    } else {
      CHECK(length > 0);

      // Update the encoder with the amount sent.
      encoder->backup(size - length);

      // See if there is any more of the message to send.
      if (encoder->remaining() == 0) {
        delete encoder;

        // Stop this watcher for now.
        ev_io_stop(loop, watcher);

        // Check for more stuff to send on socket.
        Encoder* next = socket_manager->next(s);
        if (next != NULL) {
          watcher->data = next;
          ev_io_init(watcher, next->sender(), s, EV_WRITE);
          ev_io_start(loop, watcher);
        } else {
          // Nothing more to send right now, clean up.
          delete watcher;
        }
        break;
      }
    }
  }
}


void sending_connect(struct ev_loop* loop, ev_io* watcher, int revents)
{
  int s = watcher->fd;

  // Now check that a successful connection was made.
  int opt;
  socklen_t optlen = sizeof(opt);

  if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
    // Connect failure.
    VLOG(1) << "Socket error while connecting";
    socket_manager->close(s);
    MessageEncoder* encoder = (MessageEncoder*) watcher->data;
    delete encoder;
    ev_io_stop(loop, watcher);
    delete watcher;
  } else {
    // We're connected! Now let's do some sending.
    ev_io_stop(loop, watcher);
    ev_io_init(watcher, send_data, s, EV_WRITE);
    ev_io_start(loop, watcher);
  }
}


void receiving_connect(struct ev_loop* loop, ev_io* watcher, int revents)
{
  int s = watcher->fd;

  // Now check that a successful connection was made.
  int opt;
  socklen_t optlen = sizeof(opt);

  if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
    // Connect failure.
    VLOG(1) << "Socket error while connecting";
    socket_manager->close(s);
    Socket* socket = (Socket*) watcher->data;
    delete socket;
    ev_io_stop(loop, watcher);
    delete watcher;
  } else {
    // We're connected! Now let's do some receiving.
    ev_io_stop(loop, watcher);
    ev_io_init(watcher, ignore_data, s, EV_READ);
    ev_io_start(loop, watcher);
  }
}


void accept(struct ev_loop* loop, ev_io* watcher, int revents)
{
  CHECK_EQ(__s__, watcher->fd);

  sockaddr_in addr;
  socklen_t addrlen = sizeof(addr);

  int s = ::accept(__s__, (sockaddr*) &addr, &addrlen);

  if (s < 0) {
    return;
  }

  Try<Nothing> nonblock = os::nonblock(s);
  if (nonblock.isError()) {
    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
                                << nonblock.error();
    os::close(s);
    return;
  }

  Try<Nothing> cloexec = os::cloexec(s);
  if (cloexec.isError()) {
    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
                                << cloexec.error();
    os::close(s);
    return;
  }

  // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
  int on = 1;
  if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
    const char* error = strerror(errno);
    VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
    os::close(s);
  } else {
    // Inform the socket manager for proper bookkeeping.
    const Socket& socket = socket_manager->accepted(s);

    // Allocate and initialize the decoder and watcher.
    DataDecoder* decoder = new DataDecoder(socket);

    ev_io* watcher = new ev_io();
    watcher->data = decoder;

    ev_io_init(watcher, recv_data, s, EV_READ);
    ev_io_start(loop, watcher);
  }
}


// Data necessary for polling so we can discard polling and actually
// stop it in the event loop.
struct Poll
{
  Poll()
  {
    // Need to explicitly instantiate the watchers.
    watcher.io.reset(new ev_io());
    watcher.async.reset(new ev_async());
  }

  // An I/O watcher for checking for readability or writeability and
  // an async watcher for being able to discard the polling.
  struct {
    memory::shared_ptr<ev_io> io;
    memory::shared_ptr<ev_async> async;
  } watcher;

  Promise<short> promise;
};


// Event loop callback when I/O is ready on polling file descriptor.
void polled(struct ev_loop* loop, ev_io* watcher, int revents)
{
  Poll* poll = (Poll*) watcher->data;

  ev_io_stop(loop, poll->watcher.io.get());

  // Stop the async watcher (also clears if pending so 'discard_poll'
  // will not get invoked and we can delete 'poll' here).
  ev_async_stop(loop, poll->watcher.async.get());

  poll->promise.set(revents);

  delete poll;
}


// Event loop callback when future associated with polling file
// descriptor has been discarded.
void discard_poll(struct ev_loop* loop, ev_async* watcher, int revents)
{
  Poll* poll = (Poll*) watcher->data;

  // Check and see if we have a pending 'polled' callback and if so
  // let it "win".
  if (ev_is_pending(poll->watcher.io.get())) {
    return;
  }

  ev_async_stop(loop, poll->watcher.async.get());

  // Stop the I/O watcher (but note we check if pending above) so it
  // won't get invoked and we can delete 'poll' here.
  ev_io_stop(loop, poll->watcher.io.get());

  poll->promise.discard();

  delete poll;
}


void* serve(void* arg)
{
  ev_loop(((struct ev_loop*) arg), 0);

  return NULL;
}


void* schedule(void* arg)
{
  do {
    ProcessBase* process = process_manager->dequeue();
    if (process == NULL) {
      Gate::state_t old = gate->approach();
      process = process_manager->dequeue();
      if (process == NULL) {
        gate->arrive(old); // Wait at gate if idle.
        continue;
      } else {
        gate->leave();
      }
    }
    process_manager->resume(process);
  } while (true);
}


// We might find value in catching terminating signals at some point.
// However, for now, adding signal handlers freely is not allowed
// because they will clash with Java and Python virtual machines and
// causes hard to debug crashes/segfaults.

// void sigbad(int signal, struct sigcontext *ctx)
// {
//   // Pass on the signal (so that a core file is produced).
//   struct sigaction sa;
//   sa.sa_handler = SIG_DFL;
//   sigemptyset(&sa.sa_mask);
//   sa.sa_flags = 0;
//   sigaction(signal, &sa, NULL);
//   raise(signal);
// }


void initialize(const string& delegate)
{
  // TODO(benh): Return an error if attempting to initialize again
  // with a different delegate then originally specified.

  // static pthread_once_t init = PTHREAD_ONCE_INIT;
  // pthread_once(&init, ...);

  static volatile bool initialized = false;
  static volatile bool initializing = true;

  // Try and do the initialization or wait for it to complete.
  if (initialized && !initializing) {
    return;
  } else if (initialized && initializing) {
    while (initializing);
    return;
  } else {
    if (!__sync_bool_compare_and_swap(&initialized, false, true)) {
      while (initializing);
      return;
    }
  }

//   // Install signal handler.
//   struct sigaction sa;

//   sa.sa_handler = (void (*) (int)) sigbad;
//   sigemptyset (&sa.sa_mask);
//   sa.sa_flags = SA_RESTART;

//   sigaction (SIGTERM, &sa, NULL);
//   sigaction (SIGINT, &sa, NULL);
//   sigaction (SIGQUIT, &sa, NULL);
//   sigaction (SIGSEGV, &sa, NULL);
//   sigaction (SIGILL, &sa, NULL);
// #ifdef SIGBUS
//   sigaction (SIGBUS, &sa, NULL);
// #endif
// #ifdef SIGSTKFLT
//   sigaction (SIGSTKFLT, &sa, NULL);
// #endif
//   sigaction (SIGABRT, &sa, NULL);

//   sigaction (SIGFPE, &sa, NULL);

#ifdef __sun__
  /* Need to ignore this since we can't do MSG_NOSIGNAL on Solaris. */
  signal(SIGPIPE, SIG_IGN);
#endif // __sun__

  // Create a new ProcessManager and SocketManager.
  process_manager = new ProcessManager(delegate);
  socket_manager = new SocketManager();

  // Setup processing threads.
  // We create no fewer than 8 threads because some tests require
  // more worker threads than 'sysconf(_SC_NPROCESSORS_ONLN)' on
  // computers with fewer cores.
  // e.g. https://issues.apache.org/jira/browse/MESOS-818
  //
  // TODO(xujyan): Use a smarter algorithm to allocate threads.
  // Allocating a static number of threads can cause starvation if
  // there are more waiting Processes than the number of worker
  // threads.
  long cpus = std::max(8L, sysconf(_SC_NPROCESSORS_ONLN));

  for (int i = 0; i < cpus; i++) {
    pthread_t thread; // For now, not saving handles on our threads.
    if (pthread_create(&thread, NULL, schedule, NULL) != 0) {
      LOG(FATAL) << "Failed to initialize, pthread_create";
    }
  }

  __ip__ = 0;
  __port__ = 0;

  char* value;

  // Check environment for ip.
  value = getenv("LIBPROCESS_IP");
  if (value != NULL) {
    int result = inet_pton(AF_INET, value, &__ip__);
    if (result == 0) {
      LOG(FATAL) << "LIBPROCESS_IP=" << value << " was unparseable";
    } else if (result < 0) {
      PLOG(FATAL) << "Failed to initialize, inet_pton";
    }
  }

  // Check environment for port.
  value = getenv("LIBPROCESS_PORT");
  if (value != NULL) {
    int result = atoi(value);
    if (result < 0 || result > USHRT_MAX) {
      LOG(FATAL) << "LIBPROCESS_PORT=" << value << " is not a valid port";
    }
    __port__ = result;
  }

  // Create a "server" socket for communicating with other nodes.
  if ((__s__ = ::socket(AF_INET, SOCK_STREAM, 0)) < 0) {
    PLOG(FATAL) << "Failed to initialize, socket";
  }

  // Make socket non-blocking.
  Try<Nothing> nonblock = os::nonblock(__s__);
  if (nonblock.isError()) {
    LOG(FATAL) << "Failed to initialize, nonblock: " << nonblock.error();
  }

  // Set FD_CLOEXEC flag.
  Try<Nothing> cloexec = os::cloexec(__s__);
  if (cloexec.isError()) {
    LOG(FATAL) << "Failed to initialize, cloexec: " << cloexec.error();
  }

  // Allow address reuse.
  int on = 1;
  if (setsockopt(__s__, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
    PLOG(FATAL) << "Failed to initialize, setsockopt(SO_REUSEADDR)";
  }

  // Set up socket.
  sockaddr_in addr;
  memset(&addr, 0, sizeof(addr));
  addr.sin_family = PF_INET;
  addr.sin_addr.s_addr = __ip__;
  addr.sin_port = htons(__port__);

  if (bind(__s__, (sockaddr*) &addr, sizeof(addr)) < 0) {
    PLOG(FATAL) << "Failed to initialize, bind "
                << inet_ntoa(addr.sin_addr) << ":" << __port__;
  }

  // Lookup and store assigned ip and assigned port.
  socklen_t addrlen = sizeof(addr);
  if (getsockname(__s__, (sockaddr*) &addr, &addrlen) < 0) {
    PLOG(FATAL) << "Failed to initialize, getsockname";
  }

  __ip__ = addr.sin_addr.s_addr;
  __port__ = ntohs(addr.sin_port);

  // Lookup hostname if missing ip or if ip is 127.0.0.1 in case we
  // actually have a valid external ip address. Note that we need only
  // one ip address, so that other processes can send and receive and
  // don't get confused as to whom they are sending to.
  if (__ip__ == 0 || __ip__ == 2130706433) {
    char hostname[512];

    if (gethostname(hostname, sizeof(hostname)) < 0) {
      LOG(FATAL) << "Failed to initialize, gethostname: "
                 << hstrerror(h_errno);
    }

    // Lookup IP address of local hostname.
    hostent* he;

    if ((he = gethostbyname2(hostname, AF_INET)) == NULL) {
      LOG(FATAL) << "Failed to initialize, gethostbyname2: "
                 << hstrerror(h_errno);
    }

    __ip__ = *((uint32_t *) he->h_addr_list[0]);
  }

  if (listen(__s__, 500000) < 0) {
    PLOG(FATAL) << "Failed to initialize, listen";
  }

  // Setup event loop.
#ifdef __sun__
  loop = ev_default_loop(EVBACKEND_POLL | EVBACKEND_SELECT);
#else
  loop = ev_default_loop(EVFLAG_AUTO);
#endif // __sun__

  ev_async_init(&async_watcher, handle_async);
  ev_async_start(loop, &async_watcher);

  ev_timer_init(&timeouts_watcher, handle_timeouts, 0., 2100000.0);
  ev_timer_again(loop, &timeouts_watcher);

  ev_io_init(&server_watcher, accept, __s__, EV_READ);
  ev_io_start(loop, &server_watcher);

//   ev_child_init(&child_watcher, child_exited, pid, 0);
//   ev_child_start(loop, &cw);

//   /* Install signal handler. */
//   struct sigaction sa;

//   sa.sa_handler = ev_sighandler;
//   sigfillset (&sa.sa_mask);
//   sa.sa_flags = SA_RESTART; /* if restarting works we save one iteration */
//   sigaction (w->signum, &sa, 0);

//   sigemptyset (&sa.sa_mask);
//   sigaddset (&sa.sa_mask, w->signum);
//   sigprocmask (SIG_UNBLOCK, &sa.sa_mask, 0);

  pthread_t thread; // For now, not saving handles on our threads.
  if (pthread_create(&thread, NULL, serve, loop) != 0) {
    LOG(FATAL) << "Failed to initialize, pthread_create";
  }

  // Need to set initialzing here so that we can actually invoke
  // 'spawn' below for the garbage collector.
  initializing = false;

  // TODO(benh): Make sure creating the garbage collector, logging
  // process, and profiler always succeeds and use supervisors to make
  // sure that none terminate.

  // Create global garbage collector process.
  gc = spawn(new GarbageCollector());

  // Create global help process.
  help = spawn(new Help(), true);

  // Create the global logging process.
  spawn(new Logging(), true);

  // Create the global profiler process.
  spawn(new Profiler(), true);

  // Create the global system statistics process.
  spawn(new System(), true);

  // Create the global statistics.
  // TODO(dhamon): Plumb this through to metrics.
  // value = getenv("LIBPROCESS_STATISTICS_WINDOW");
  // if (value != NULL) {
  //   Try<Duration> window = Duration::parse(string(value));
  //   if (window.isError()) {
  //     LOG(FATAL) << "LIBPROCESS_STATISTICS_WINDOW=" << value
  //                << " is not a valid duration: " << window.error();
  //   }
  //   statistics = new Statistics(window.get());
  // } else {
  //   // TODO(bmahler): Investigate memory implications of this window
  //   // size. We may also want to provide a maximum memory size rather than
  //   // time window. Or, offload older data to disk, etc.
  //   statistics = new Statistics(LIBPROCESS_STATISTICS_WINDOW);
  // }

  // Ensure metrics process is running.
  // TODO(bmahler): Consider initializing this consistently with
  // the other global Processes.
  MetricsProcess* metricsProcess = MetricsProcess::instance();
  CHECK_NOTNULL(metricsProcess);

  // Initialize the mime types.
  mime::initialize();

  // Initialize the response statuses.
  http::initialize();

  // Add a route for getting process information.
  lambda::function<Future<Response>(const Request&)> __processes__ =
    lambda::bind(&ProcessManager::__processes__, process_manager, lambda::_1);

  new Route("/__processes__", None(), __processes__);

  char temp[INET_ADDRSTRLEN];
  if (inet_ntop(AF_INET, (in_addr*) &__ip__, temp, INET_ADDRSTRLEN) == NULL) {
    PLOG(FATAL) << "Failed to initialize, inet_ntop";
  }

  VLOG(1) << "libprocess is initialized on " << temp << ":" << __port__
          << " for " << cpus << " cpus";
}


void finalize()
{
  delete process_manager;
}


uint32_t ip()
{
  process::initialize();
  return __ip__;
}


uint16_t port()
{
  process::initialize();
  return __port__;
}


HttpProxy::HttpProxy(const Socket& _socket)
  : ProcessBase(ID::generate("__http__")),
    socket(_socket) {}


HttpProxy::~HttpProxy()
{
  // Need to make sure response producers know not to continue to
  // create a response (streaming or otherwise).
  if (pipe.isSome()) {
    os::close(pipe.get());
  }
  pipe = None();

  while (!items.empty()) {
    Item* item = items.front();

    // Attempt to discard the future.
    item->future->discard();

    // But it might have already been ready. In general, we need to
    // wait until this future is potentially ready in order to attempt
    // to close a pipe if one exists.
    item->future->onReady(lambda::bind(&Item::cleanup, lambda::_1));

    items.pop();
    delete item;
  }
}


void HttpProxy::enqueue(const Response& response, const Request& request)
{
  handle(new Future<Response>(response), request);
}


void HttpProxy::handle(Future<Response>* future, const Request& request)
{
  items.push(new Item(request, future));

  if (items.size() == 1) {
    next();
  }
}


void HttpProxy::next()
{
  if (items.size() > 0) {
    // Wait for any transition of the future.
    items.front()->future->onAny(
        defer(self(), &HttpProxy::waited, lambda::_1));
  }
}


void HttpProxy::waited(const Future<Response>& future)
{
  CHECK(items.size() > 0);
  Item* item = items.front();

  CHECK(future == *item->future);

  // Process the item and determine if we're done or not (so we know
  // whether to start waiting on the next responses).
  bool processed = process(*item->future, item->request);

  items.pop();
  delete item;

  if (processed) {
    next();
  }
}


bool HttpProxy::process(const Future<Response>& future, const Request& request)
{
  if (!future.isReady()) {
    // TODO(benh): Consider handling other "states" of future
    // (discarded, failed, etc) with different HTTP statuses.
    socket_manager->send(ServiceUnavailable(), request, socket);
    return true; // All done, can process next response.
  }

  Response response = future.get();

  // If the response specifies a path, try and perform a sendfile.
  if (response.type == Response::PATH) {
    // Make sure no body is sent (this is really an error and
    // should be reported and no response sent.
    response.body.clear();

    const string& path = response.path;
    int fd = open(path.c_str(), O_RDONLY);
    if (fd < 0) {
      if (errno == ENOENT || errno == ENOTDIR) {
          VLOG(1) << "Returning '404 Not Found' for path '" << path << "'";
          socket_manager->send(NotFound(), request, socket);
      } else {
        const char* error = strerror(errno);
        VLOG(1) << "Failed to send file at '" << path << "': " << error;
        socket_manager->send(InternalServerError(), request, socket);
      }
    } else {
      struct stat s; // Need 'struct' because of function named 'stat'.
      if (fstat(fd, &s) != 0) {
        const char* error = strerror(errno);
        VLOG(1) << "Failed to send file at '" << path << "': " << error;
        socket_manager->send(InternalServerError(), request, socket);
      } else if (S_ISDIR(s.st_mode)) {
        VLOG(1) << "Returning '404 Not Found' for directory '" << path << "'";
        socket_manager->send(NotFound(), request, socket);
      } else {
        // While the user is expected to properly set a 'Content-Type'
        // header, we fill in (or overwrite) 'Content-Length' header.
        stringstream out;
        out << s.st_size;
        response.headers["Content-Length"] = out.str();

        if (s.st_size == 0) {
          socket_manager->send(response, request, socket);
          return true; // All done, can process next request.
        }

        VLOG(1) << "Sending file at '" << path << "' with length " << s.st_size;

        // TODO(benh): Consider a way to have the socket manager turn
        // on TCP_CORK for both sends and then turn it off.
        socket_manager->send(
            new HttpResponseEncoder(socket, response, request),
            true);

        // Note the file descriptor gets closed by FileEncoder.
        socket_manager->send(
            new FileEncoder(socket, fd, s.st_size),
            request.keepAlive);
      }
    }
  } else if (response.type == Response::PIPE) {
    // Make sure no body is sent (this is really an error and
    // should be reported and no response sent.
    response.body.clear();

    // Make sure the pipe is nonblocking.
    Try<Nothing> nonblock = os::nonblock(response.pipe);
    if (nonblock.isError()) {
      const char* error = strerror(errno);
      VLOG(1) << "Failed make pipe nonblocking: " << error;
      socket_manager->send(InternalServerError(), request, socket);
      return true; // All done, can process next response.
    }

    // While the user is expected to properly set a 'Content-Type'
    // header, we fill in (or overwrite) 'Transfer-Encoding' header.
    response.headers["Transfer-Encoding"] = "chunked";

    VLOG(1) << "Starting \"chunked\" streaming";

    socket_manager->send(
        new HttpResponseEncoder(socket, response, request),
        true);

    pipe = response.pipe;

    io::poll(pipe.get(), io::READ).onAny(
        defer(self(), &Self::stream, lambda::_1, request));

    return false; // Streaming, don't process next response (yet)!
  } else {
    socket_manager->send(response, request, socket);
  }

  return true; // All done, can process next response.
}


void HttpProxy::stream(const Future<short>& poll, const Request& request)
{
  // TODO(benh): Use 'splice' on Linux.

  CHECK(pipe.isSome());

  bool finished = false; // Whether we're done streaming.

  if (poll.isReady()) {
    // Read and write.
    CHECK(poll.get() == io::READ);
    const size_t size = 4 * 1024; // 4K.
    char data[size];
    while (!finished) {
      ssize_t length = ::read(pipe.get(), data, size);
      if (length < 0 && (errno == EINTR)) {
        // Interrupted, try again now.
        continue;
      } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
        // Might block, try again later.
        io::poll(pipe.get(), io::READ).onAny(
            defer(self(), &Self::stream, lambda::_1, request));
        break;
      } else {
        std::ostringstream out;
        if (length <= 0) {
          // Error or closed, treat both as closed.
          if (length < 0) {
            // Error.
            const char* error = strerror(errno);
            VLOG(1) << "Read error while streaming: " << error;
          }
          out << "0\r\n" << "\r\n";
          finished = true;
        } else {
          // Data!
          out << std::hex << length << "\r\n";
          out.write(data, length);
          out << "\r\n";
        }

        // We always persist the connection when we're not finished
        // streaming.
        socket_manager->send(
            new DataEncoder(socket, out.str()),
            finished ? request.keepAlive : true);
      }
    }
  } else if (poll.isFailed()) {
    VLOG(1) << "Failed to poll: " << poll.failure();
    socket_manager->send(InternalServerError(), request, socket);
    finished = true;
  } else {
    VLOG(1) << "Unexpected discarded future while polling";
    socket_manager->send(InternalServerError(), request, socket);
    finished = true;
  }

  if (finished) {
    os::close(pipe.get());
    pipe = None();
    next();
  }
}


SocketManager::SocketManager()
{
  synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
}


SocketManager::~SocketManager() {}


Socket SocketManager::accepted(int s)
{
  synchronized (this) {
    return sockets[s] = Socket(s);
  }

  UNREACHABLE();
}


void SocketManager::link(ProcessBase* process, const UPID& to)
{
  // TODO(benh): The semantics we want to support for link are such
  // that if there is nobody to link to (local or remote) then an
  // ExitedEvent gets generated. This functionality has only been
  // implemented when the link is local, not remote. Of course, if
  // there is nobody listening on the remote side, then this should
  // work remotely ... but if there is someone listening remotely just
  // not at that id, then it will silently continue executing.

  CHECK(process != NULL);

  Node node(to.ip, to.port);

  synchronized (this) {
    // Check if node is remote and there isn't a persistant link.
    if ((node.ip != __ip__ || node.port != __port__)
        && persists.count(node) == 0) {
      // Okay, no link, let's create a socket.
      Try<int> socket = process::socket(AF_INET, SOCK_STREAM, 0);
      if (socket.isError()) {
        LOG(FATAL) << "Failed to link, socket: " << socket.error();
      }

      int s = socket.get();

      Try<Nothing> nonblock = os::nonblock(s);
      if (nonblock.isError()) {
        LOG(FATAL) << "Failed to link, nonblock: " << nonblock.error();
      }

      Try<Nothing> cloexec = os::cloexec(s);
      if (cloexec.isError()) {
        LOG(FATAL) << "Failed to link, cloexec: " << cloexec.error();
      }

      sockets[s] = Socket(s);
      nodes[s] = node;

      persists[node] = s;

      // Allocate and initialize a watcher for reading data from this
      // socket. Note that we don't expect to receive anything other
      // than HTTP '202 Accepted' responses which we anyway ignore.
      // We do, however, want to react when it gets closed so we can
      // generate appropriate lost events (since this is a 'link').
      ev_io* watcher = new ev_io();
      watcher->data = new Socket(sockets[s]);

      // Try and connect to the node using this socket.
      sockaddr_in addr;
      memset(&addr, 0, sizeof(addr));
      addr.sin_family = PF_INET;
      addr.sin_port = htons(to.port);
      addr.sin_addr.s_addr = to.ip;

      if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
        if (errno != EINPROGRESS) {
          PLOG(FATAL) << "Failed to link, connect";
        }

        // Wait for socket to be connected.
        ev_io_init(watcher, receiving_connect, s, EV_WRITE);
      } else {
        ev_io_init(watcher, ignore_data, s, EV_READ);
      }

      // Enqueue the watcher.
      synchronized (watchers) {
        watchers->push(watcher);
      }

      // Interrupt the loop.
      ev_async_send(loop, &async_watcher);
    }

    links.linkers[to].insert(process);
    links.linkees[process].insert(to);
    if (node.ip != __ip__ || node.port != __port__) {
      links.remotes[node].insert(to);
    }
  }
}


PID<HttpProxy> SocketManager::proxy(const Socket& socket)
{
  HttpProxy* proxy = NULL;

  synchronized (this) {
    // This socket might have been asked to get closed (e.g., remote
    // side hang up) while a process is attempting to handle an HTTP
    // request. Thus, if there is no more socket, return an empty PID.
    if (sockets.count(socket) > 0) {
      if (proxies.count(socket) > 0) {
        return proxies[socket]->self();
      } else {
        proxy = new HttpProxy(sockets[socket]);
        proxies[socket] = proxy;
      }
    }
  }

  // Now check if we need to spawn a newly created proxy. Note that we
  // need to do this outside of the synchronized block above to avoid
  // a possible deadlock (because spawn eventually synchronizes on
  // ProcessManager and ProcessManager::cleanup synchronizes on
  // ProcessManager and then SocketManager, so a deadlock results if
  // we do spawn within the synchronized block above).
  if (proxy != NULL) {
    return spawn(proxy, true);
  }

  return PID<HttpProxy>();
}


void SocketManager::send(Encoder* encoder, bool persist)
{
  CHECK(encoder != NULL);

  synchronized (this) {
    if (sockets.count(encoder->socket()) > 0) {
      // Update whether or not this socket should get disposed after
      // there is no more data to send.
      if (!persist) {
        dispose.insert(encoder->socket());
      }

      if (outgoing.count(encoder->socket()) > 0) {
        outgoing[encoder->socket()].push(encoder);
      } else {
        // Initialize the outgoing queue.
        outgoing[encoder->socket()];

        // Allocate and initialize the watcher.
        ev_io* watcher = new ev_io();
        watcher->data = encoder;

        ev_io_init(watcher, encoder->sender(), encoder->socket(), EV_WRITE);

        synchronized (watchers) {
          watchers->push(watcher);
        }

        ev_async_send(loop, &async_watcher);
      }
    } else {
      VLOG(1) << "Attempting to send on a no longer valid socket!";
      delete encoder;
    }
  }
}


void SocketManager::send(
    const Response& response,
    const Request& request,
    const Socket& socket)
{
  bool persist = request.keepAlive;

  // Don't persist the connection if the headers include
  // 'Connection: close'.
  if (response.headers.contains("Connection")) {
    if (response.headers.get("Connection").get() == "close") {
      persist = false;
    }
  }

  send(new HttpResponseEncoder(socket, response, request), persist);
}


void SocketManager::send(Message* message)
{
  CHECK(message != NULL);

  Node node(message->to.ip, message->to.port);

  synchronized (this) {
    // Check if there is already a socket.
    bool persist = persists.count(node) > 0;
    bool temp = temps.count(node) > 0;
    if (persist || temp) {
      int s = persist ? persists[node] : temps[node];
      CHECK(sockets.count(s) > 0);
      send(new MessageEncoder(sockets[s], message), persist);
    } else {
      // No peristent or temporary socket to the node currently
      // exists, so we create a temporary one.
      Try<int> socket = process::socket(AF_INET, SOCK_STREAM, 0);
      if (socket.isError()) {
        LOG(FATAL) << "Failed to send, socket: " << socket.error();
      }

      int s = socket.get();

      Try<Nothing> nonblock = os::nonblock(s);
      if (nonblock.isError()) {
        LOG(FATAL) << "Failed to send, nonblock: " << nonblock.error();
      }

      Try<Nothing> cloexec = os::cloexec(s);
      if (cloexec.isError()) {
        LOG(FATAL) << "Failed to send, cloexec: " << cloexec.error();
      }

      sockets[s] = Socket(s);
      nodes[s] = node;
      temps[node] = s;

      dispose.insert(s);

      // Initialize the outgoing queue.
      outgoing[s];

      // Allocate and initialize a watcher for reading data from this
      // socket. Note that we don't expect to receive anything other
      // than HTTP '202 Accepted' responses which we anyway ignore.
      ev_io* watcher = new ev_io();
      watcher->data = new Socket(sockets[s]);

      ev_io_init(watcher, ignore_data, s, EV_READ);

      // Enqueue the watcher.
      synchronized (watchers) {
        watchers->push(watcher);
      }

      // Allocate and initialize a watcher for sending the message.
      watcher = new ev_io();
      watcher->data = new MessageEncoder(sockets[s], message);

      // Try and connect to the node using this socket.
      sockaddr_in addr;
      memset(&addr, 0, sizeof(addr));
      addr.sin_family = PF_INET;
      addr.sin_port = htons(message->to.port);
      addr.sin_addr.s_addr = message->to.ip;

      if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
        if (errno != EINPROGRESS) {
          PLOG(FATAL) << "Failed to send, connect";
        }

        // Initialize watcher for connecting.
        ev_io_init(watcher, sending_connect, s, EV_WRITE);
      } else {
        // Initialize watcher for sending.
        ev_io_init(watcher, send_data, s, EV_WRITE);
      }

      // Enqueue the watcher.
      synchronized (watchers) {
        watchers->push(watcher);
      }

      ev_async_send(loop, &async_watcher);
    }
  }
}


Encoder* SocketManager::next(int s)
{
  HttpProxy* proxy = NULL; // Non-null if needs to be terminated.

  synchronized (this) {
    // We cannot assume 'sockets.count(s) > 0' here because it's
    // possible that 's' has been removed with a a call to
    // SocketManager::close. For example, it could be the case that a
    // socket has gone to CLOSE_WAIT and the call to 'recv' in
    // recv_data returned 0 causing SocketManager::close to get
    // invoked. Later a call to 'send' or 'sendfile' (e.g., in
    // send_data or send_file) can "succeed" (because the socket is
    // not "closed" yet because there are still some Socket
    // references, namely the reference being used in send_data or
    // send_file!). However, when SocketManger::next is actually
    // invoked we find out there there is no more data and thus stop
    // sending.
    // TODO(benh): Should we actually finish sending the data!?
    if (sockets.count(s) > 0) {
      CHECK(outgoing.count(s) > 0);

      if (!outgoing[s].empty()) {
        // More messages!
        Encoder* encoder = outgoing[s].front();
        outgoing[s].pop();
        return encoder;
      } else {
        // No more messages ... erase the outgoing queue.
        outgoing.erase(s);

        if (dispose.count(s) > 0) {
          // This is either a temporary socket we created or it's a
          // socket that we were receiving data from and possibly
          // sending HTTP responses back on. Clean up either way.
          if (nodes.count(s) > 0) {
            const Node& node = nodes[s];
            CHECK(temps.count(node) > 0 && temps[node] == s);
            temps.erase(node);
            nodes.erase(s);
          }

          if (proxies.count(s) > 0) {
            proxy = proxies[s];
            proxies.erase(s);
          }

          dispose.erase(s);
          sockets.erase(s);

          // We don't actually close the socket (we wait for the Socket
          // abstraction to close it once there are no more references),
          // but we do shutdown the receiving end so any DataDecoder
          // will get cleaned up (which might have the last reference).
          shutdown(s, SHUT_RD);
        }
      }
    }
  }

  // We terminate the proxy outside the synchronized block to avoid
  // possible deadlock between the ProcessManager and SocketManager
  // (see comment in SocketManager::proxy for more information).
  if (proxy != NULL) {
    terminate(proxy);
  }

  return NULL;
}


void SocketManager::close(int s)
{
  HttpProxy* proxy = NULL; // Non-null if needs to be terminated.

  synchronized (this) {
    // This socket might not be active if it was already asked to get
    // closed (e.g., a write on the socket failed so we try and close
    // it and then later the read side of the socket gets closed so we
    // try and close it again). Thus, ignore the request if we don't
    // know about the socket.
    if (sockets.count(s) > 0) {
      // Clean up any remaining encoders for this socket.
      if (outgoing.count(s) > 0) {
        while (!outgoing[s].empty()) {
          Encoder* encoder = outgoing[s].front();
          delete encoder;
          outgoing[s].pop();
        }

        outgoing.erase(s);
      }

      // Clean up after sockets used for node communication.
      if (nodes.count(s) > 0) {
        const Node& node = nodes[s];

        // Don't bother invoking exited unless socket was persistant.
        if (persists.count(node) > 0 && persists[node] == s) {
          persists.erase(node);
          exited(node); // Generate ExitedEvent(s)!
        } else if (temps.count(node) > 0 && temps[node] == s) {
          temps.erase(node);
        }

        nodes.erase(s);
      }

      // Clean up any proxy associated with this socket.
      if (proxies.count(s) > 0) {
        proxy = proxies[s];
        proxies.erase(s);
      }

      // We need to stop any 'ignore_data' readers as they may have
      // the last Socket reference so we shutdown reads but don't do a
      // full close (since that will be taken care of by ~Socket, see
      // comment below). Calling 'shutdown' will trigger 'ignore_data'
      // which will get back a 0 (i.e., EOF) when it tries to read
      // from the socket. Note we need to do this before we call
      // 'sockets.erase(s)' to avoid the potential race with the last
      // reference being in 'sockets'.
      shutdown(s, SHUT_RD);

      dispose.erase(s);
      sockets.erase(s);
    }
  }

  // We terminate the proxy outside the synchronized block to avoid
  // possible deadlock between the ProcessManager and SocketManager.
  if (proxy != NULL) {
    terminate(proxy);
  }

  // Note that we don't actually:
  //
  //   close(s);
  //
  // Because, for example, there could be a race between an HttpProxy
  // trying to do send a response with SocketManager::send() or a
  // process might be responding to another Request (e.g., trying
  // to do a sendfile) since these things may be happening
  // asynchronously we can't close the socket yet, because it might
  // get reused before any of the above things have finished, and then
  // we'll end up sending data on the wrong socket! Instead, we rely
  // on the last reference of our Socket object to close the
  // socket. Note, however, that since socket is no longer in
  // 'sockets' any attempt to send with it will just get ignored.
  // TODO(benh): Always do a 'shutdown(s, SHUT_RDWR)' since that
  // should keep the file descriptor valid until the last Socket
  // reference does a close but force all libev watchers to stop?
}


void SocketManager::exited(const Node& node)
{
  // TODO(benh): It would be cleaner if this routine could call back
  // into ProcessManager ... then we wouldn't have to convince
  // ourselves that the accesses to each Process object will always be
  // valid.
  synchronized (this) {
    if (!links.remotes.contains(node)) {
      return; // No linkees for this node!
    }

    foreach (const UPID& linkee, links.remotes[node]) {
      // Find and notify the linkers.
      CHECK(links.linkers.contains(linkee));

      foreach (ProcessBase* linker, links.linkers[linkee]) {
        linker->enqueue(new ExitedEvent(linkee));

        // Remove the linkee pid from the linker.
        CHECK(links.linkees.contains(linker));

        links.linkees[linker].erase(linkee);
        if (links.linkees[linker].empty()) {
          links.linkees.erase(linker);
        }
      }

      links.linkers.erase(linkee);
    }

    links.remotes.erase(node);
  }
}


void SocketManager::exited(ProcessBase* process)
{
  // An exited event is enough to cause the process to get deleted
  // (e.g., by the garbage collector), which means we can't
  // dereference process (or even use the address) after we enqueue at
  // least one exited event. Thus, we save the process pid.
  const UPID pid = process->pid;

  // Likewise, we need to save the current time of the process so we
  // can update the clocks of linked processes as appropriate.
  const Time time = Clock::now(process);

  synchronized (this) {
    // If this process had linked to anything, we need to clean
    // up any pointers to it. Also, if this process was the last
    // linker to a remote linkee, we must remove linkee from the
    // remotes!
    if (links.linkees.contains(process)) {
      foreach (const UPID& linkee, links.linkees[process]) {
        CHECK(links.linkers.contains(linkee));

        links.linkers[linkee].erase(process);
        if (links.linkers[linkee].empty()) {
          links.linkers.erase(linkee);

          // The exited process was the last linker for this linkee,
          // so we need to remove the linkee from the remotes.
          Node node(linkee.ip, linkee.port);
          if (node.ip != __ip__ || node.port != __port__) {
            CHECK(links.remotes.contains(node));

            links.remotes[node].erase(linkee);
            if (links.remotes[node].empty()) {
              links.remotes.erase(node);
            }
          }
        }
      }
      links.linkees.erase(process);
    }

    // Find the linkers to notify.
    if (!links.linkers.contains(pid)) {
      return; // No linkers for this process!
    }

    foreach (ProcessBase* linker, links.linkers[pid]) {
      CHECK(linker != process) << "Process linked with itself";
      Clock::update(linker, time);
      linker->enqueue(new ExitedEvent(pid));

      // Remove the linkee pid from the linker.
      CHECK(links.linkees.contains(linker));

      links.linkees[linker].erase(pid);
      if (links.linkees[linker].empty()) {
        links.linkees.erase(linker);
      }
    }

    links.linkers.erase(pid);
  }
}


ProcessManager::ProcessManager(const string& _delegate)
  : delegate(_delegate)
{
  synchronizer(processes) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
  synchronizer(runq) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
  running = 0;
  __sync_synchronize(); // Ensure write to 'running' visible in other threads.
}


ProcessManager::~ProcessManager()
{
  ProcessBase* process = NULL;
  // Pop a process off the top and terminate it. Don't hold the lock
  // or process the whole map as terminating one process might
  // trigger other terminations. Deal with them one at a time.
  do {
    synchronized (processes) {
      process = !processes.empty() ? processes.begin()->second : NULL;
    }
    if (process != NULL) {
      process::terminate(process);
      process::wait(process);
    }
  } while (process != NULL);
}


ProcessReference ProcessManager::use(const UPID& pid)
{
  if (pid.ip == __ip__ && pid.port == __port__) {
    synchronized (processes) {
      if (processes.count(pid.id) > 0) {
        // Note that the ProcessReference constructor _must_ get
        // called while holding the lock on processes so that waiting
        // for references is atomic (i.e., race free).
        return ProcessReference(processes[pid.id]);
      }
    }
  }

  return ProcessReference(NULL);
}


bool ProcessManager::handle(
    const Socket& socket,
    Request* request)
{
  CHECK(request != NULL);

  // Check if this is a libprocess request (i.e., 'User-Agent:
  // libprocess/id@ip:port') and if so, parse as a message.
  if (libprocess(request)) {
    Message* message = parse(request);
    if (message != NULL) {
      // TODO(benh): Use the sender PID when delivering in order to
      // capture happens-before timing relationships for testing.
      bool accepted = deliver(message->to, new MessageEvent(message));

      // Get the HttpProxy pid for this socket.
      PID<HttpProxy> proxy = socket_manager->proxy(socket);

      // Only send back an HTTP response if this isn't from libprocess
      // (which we determine by looking at the User-Agent). This is
      // necessary because older versions of libprocess would try and
      // read the data and parse it as an HTTP request which would
      // fail thus causing the socket to get closed (but now
      // libprocess will ignore responses, see ignore_data).
      Option<string> agent = request->headers.get("User-Agent");
      if (agent.get("").find("libprocess/") == string::npos) {
        if (accepted) {
          VLOG(2) << "Accepted libprocess message to " << request->path;
          dispatch(proxy, &HttpProxy::enqueue, Accepted(), *request);
        } else {
          VLOG(1) << "Failed to handle libprocess message to "
                  << request->path << ": not found";
          dispatch(proxy, &HttpProxy::enqueue, NotFound(), *request);
        }
      }

      delete request;

      return accepted;
    }

    VLOG(1) << "Failed to handle libprocess message: "
            << request->method << " " << request->path
            << " (User-Agent: " << request->headers["User-Agent"] << ")";

    delete request;
    return false;
  }

  // Treat this as an HTTP request. Start by checking that the path
  // starts with a '/' (since the code below assumes as much).
  if (request->path.find('/') != 0) {
    VLOG(1) << "Returning '400 Bad Request' for '" << request->path << "'";

    // Get the HttpProxy pid for this socket.
    PID<HttpProxy> proxy = socket_manager->proxy(socket);

    // Enqueue the response with the HttpProxy so that it respects the
    // order of requests to account for HTTP/1.1 pipelining.
    dispatch(proxy, &HttpProxy::enqueue, BadRequest(), *request);

    // Cleanup request.
    delete request;
    return false;
  }

  // Ignore requests with relative paths (i.e., contain "/..").
  if (request->path.find("/..") != string::npos) {
    VLOG(1) << "Returning '404 Not Found' for '" << request->path
            << "' (ignoring requests with relative paths)";

    // Get the HttpProxy pid for this socket.
    PID<HttpProxy> proxy = socket_manager->proxy(socket);

    // Enqueue the response with the HttpProxy so that it respects the
    // order of requests to account for HTTP/1.1 pipelining.
    dispatch(proxy, &HttpProxy::enqueue, NotFound(), *request);

    // Cleanup request.
    delete request;
    return false;
  }

  // Split the path by '/'.
  vector<string> tokens = strings::tokenize(request->path, "/");

  // Try and determine a receiver, otherwise try and delegate.
  ProcessReference receiver;

  if (tokens.size() == 0 && delegate != "") {
    request->path = "/" + delegate;
    receiver = use(UPID(delegate, __ip__, __port__));
  } else if (tokens.size() > 0) {
    // Decode possible percent-encoded path.
    Try<string> decode = http::decode(tokens[0]);
    if (!decode.isError()) {
      receiver = use(UPID(decode.get(), __ip__, __port__));
    } else {
      VLOG(1) << "Failed to decode URL path: " << decode.error();
    }
  }

  if (!receiver && delegate != "") {
    // Try and delegate the request.
    request->path = "/" + delegate + request->path;
    receiver = use(UPID(delegate, __ip__, __port__));
  }

  if (receiver) {
    // TODO(benh): Use the sender PID in order to capture
    // happens-before timing relationships for testing.
    return deliver(receiver, new HttpEvent(socket, request));
  }

  // This has no receiver, send error response.
  VLOG(1) << "Returning '404 Not Found' for '" << request->path << "'";

  // Get the HttpProxy pid for this socket.
  PID<HttpProxy> proxy = socket_manager->proxy(socket);

  // Enqueue the response with the HttpProxy so that it respects the
  // order of requests to account for HTTP/1.1 pipelining.
  dispatch(proxy, &HttpProxy::enqueue, NotFound(), *request);

  // Cleanup request.
  delete request;
  return false;
}


bool ProcessManager::deliver(
    ProcessBase* receiver,
    Event* event,
    ProcessBase* sender)
{
  CHECK(event != NULL);

  // If we are using a manual clock then update the current time of
  // the receiver using the sender if necessary to preserve the
  // happens-before relationship between the sender and receiver. Note
  // that the assumption is that the sender remains valid for at least
  // the duration of this routine (so that we can look up it's current
  // time).
  if (Clock::paused()) {
    synchronized (timeouts) {
      if (Clock::paused()) {
        if (sender != NULL) {
          Clock::order(sender, receiver);
        } else {
          Clock::update(receiver, Clock::now());
        }
      }
    }
  }

  receiver->enqueue(event);

  return true;
}


bool ProcessManager::deliver(
    const UPID& to,
    Event* event,
    ProcessBase* sender)
{
  CHECK(event != NULL);

  if (ProcessReference receiver = use(to)) {
    return deliver(receiver, event, sender);
  }

  delete event;
  return false;
}


UPID ProcessManager::spawn(ProcessBase* process, bool manage)
{
  CHECK(process != NULL);

  synchronized (processes) {
    if (processes.count(process->pid.id) > 0) {
      return UPID();
    } else {
      processes[process->pid.id] = process;
    }
  }

  // Use the garbage collector if requested.
  if (manage) {
    dispatch(gc, &GarbageCollector::manage<ProcessBase>, process);
  }

  // We save the PID before enqueueing the process to avoid the race
  // condition that occurs when a user has a very short process and
  // the process gets run and cleaned up before we return from enqueue
  // (e.g., when 'manage' is set to true).
  UPID pid = process->self();

  // Add process to the run queue (so 'initialize' will get invoked).
  enqueue(process);

  VLOG(2) << "Spawned process " << pid;

  return pid;
}


void ProcessManager::resume(ProcessBase* process)
{
  __process__ = process;

  VLOG(2) << "Resuming " << process->pid << " at " << Clock::now();

  bool terminate = false;
  bool blocked = false;

  CHECK(process->state == ProcessBase::BOTTOM ||
        process->state == ProcessBase::READY);

  if (process->state == ProcessBase::BOTTOM) {
    process->state = ProcessBase::RUNNING;
    try { process->initialize(); }
    catch (...) { terminate = true; }
  }

  while (!terminate && !blocked) {
    Event* event = NULL;

    process->lock();
    {
      if (process->events.size() > 0) {
        event = process->events.front();
        process->events.pop_front();
        process->state = ProcessBase::RUNNING;
      } else {
        process->state = ProcessBase::BLOCKED;
        blocked = true;
      }
    }
    process->unlock();

    if (!blocked) {
      CHECK(event != NULL);

      // Determine if we should filter this event.
      synchronized (filterer) {
        if (filterer != NULL) {
          bool filter = false;
          struct FilterVisitor : EventVisitor
          {
            explicit FilterVisitor(bool* _filter) : filter(_filter) {}

            virtual void visit(const MessageEvent& event)
            {
              *filter = filterer->filter(event);
            }

            virtual void visit(const DispatchEvent& event)
            {
              *filter = filterer->filter(event);
            }

            virtual void visit(const HttpEvent& event)
            {
              *filter = filterer->filter(event);
            }

            virtual void visit(const ExitedEvent& event)
            {
              *filter = filterer->filter(event);
            }

            bool* filter;
          } visitor(&filter);

          event->visit(&visitor);

          if (filter) {
            delete event;
            continue; // Try and execute the next event.
          }
        }
      }

      // Determine if we should terminate.
      terminate = event->is<TerminateEvent>();

      // Now service the event.
      try {
        process->serve(*event);
      } catch (const std::exception& e) {
        std::cerr << "libprocess: " << process->pid
                  << " terminating due to "
                  << e.what() << std::endl;
        terminate = true;
      } catch (...) {
        std::cerr << "libprocess: " << process->pid
                  << " terminating due to unknown exception" << std::endl;
        terminate = true;
      }

      delete event;

      if (terminate) {
        cleanup(process);
      }
    }
  }

  __process__ = NULL;

  CHECK_GE(running, 1);
  __sync_fetch_and_sub(&running, 1);
}


void ProcessManager::cleanup(ProcessBase* process)
{
  VLOG(2) << "Cleaning up " << process->pid;

  // First, set the terminating state so no more events will get
  // enqueued and delete al the pending events. We want to delete the
  // events before we hold the processes lock because deleting an
  // event could cause code outside libprocess to get executed which
  // might cause a deadlock with the processes lock. Likewise,
  // deleting the events now rather than later has the nice property
  // of making sure that any events that might have gotten enqueued on
  // the process we are cleaning up will get dropped (since it's
  // terminating) and eliminates the potential of enqueueing them on
  // another process that gets spawned with the same PID.
  deque<Event*> events;

  process->lock();
  {
    process->state = ProcessBase::TERMINATING;
    events = process->events;
    process->events.clear();
  }
  process->unlock();

  // Delete pending events.
  while (!events.empty()) {
    Event* event = events.front();
    events.pop_front();
    delete event;
  }

  // Possible gate non-libprocess threads are waiting at.
  Gate* gate = NULL;

  // Remove process.
  synchronized (processes) {
    // Wait for all process references to get cleaned up.
    while (process->refs > 0) {
#if defined(__i386__) || defined(__x86_64__)
      asm ("pause");
#endif
      __sync_synchronize();
    }

    process->lock();
    {
      CHECK(process->events.empty());

      processes.erase(process->pid.id);

      // Lookup gate to wake up waiting threads.
      map<ProcessBase*, Gate*>::iterator it = gates.find(process);
      if (it != gates.end()) {
        gate = it->second;
        // N.B. The last thread that leaves the gate also free's it.
        gates.erase(it);
      }

      CHECK(process->refs == 0);
      process->state = ProcessBase::TERMINATED;
    }
    process->unlock();

    // Note that we don't remove the process from the clock during
    // cleanup, but rather the clock is reset for a process when it is
    // created (see ProcessBase::ProcessBase). We do this so that
    // SocketManager::exited can access the current time of the
    // process to "order" exited events. TODO(benh): It might make
    // sense to consider storing the time of the process as a field of
    // the class instead.

    // Now we tell the socket manager about this process exiting so
    // that it can create exited events for linked processes. We
    // _must_ do this while synchronized on processes because
    // otherwise another process could attempt to link this process
    // and SocketManger::link would see that the processes doesn't
    // exist when it attempts to get a ProcessReference (since we
    // removed the process above) thus causing an exited event, which
    // could cause the process to get deleted (e.g., the garbage
    // collector might link _after_ the process has already been
    // removed from processes thus getting an exited event but we
    // don't want that exited event to fire and actually delete the
    // process until after we have used the process in
    // SocketManager::exited).
    socket_manager->exited(process);

    // ***************************************************************
    // At this point we can no longer dereference the process since it
    // might already be deallocated (e.g., by the garbage collector).
    // ***************************************************************

    // Note that we need to open the gate while synchronized on
    // processes because otherwise we might _open_ the gate before
    // another thread _approaches_ the gate causing that thread to
    // wait on _arrival_ to the gate forever (see
    // ProcessManager::wait).
    if (gate != NULL) {
      gate->open();
    }
  }
}


void ProcessManager::link(ProcessBase* process, const UPID& to)
{
  // Check if the pid is local.
  if (!(to.ip == __ip__ && to.port == __port__)) {
    socket_manager->link(process, to);
  } else {
    // Since the pid is local we want to get a reference to it's
    // underlying process so that while we are invoking the link
    // manager we don't miss sending a possible ExitedEvent.
    if (ProcessReference _ = use(to)) {
      socket_manager->link(process, to);
    } else {
      // Since the pid isn't valid it's process must have already died
      // (or hasn't been spawned yet) so send a process exit message.
      process->enqueue(new ExitedEvent(to));
    }
  }
}


void ProcessManager::terminate(
    const UPID& pid,
    bool inject,
    ProcessBase* sender)
{
  if (ProcessReference process = use(pid)) {
    if (Clock::paused()) {
      synchronized (timeouts) {
        if (Clock::paused()) {
          if (sender != NULL) {
            Clock::order(sender, process);
          } else {
            Clock::update(process, Clock::now());
          }
        }
      }
    }

    if (sender != NULL) {
      process->enqueue(new TerminateEvent(sender->self()), inject);
    } else {
      process->enqueue(new TerminateEvent(UPID()), inject);
    }
  }
}


bool ProcessManager::wait(const UPID& pid)
{
  // We use a gate for waiters. A gate is single use. That is, a new
  // gate is created when the first thread shows up and wants to wait
  // for a process that currently has no gate. Once that process
  // exits, the last thread to leave the gate will also clean it
  // up. Note that a gate will never get more threads waiting on it
  // after it has been opened, since the process should no longer be
  // valid and therefore will not have an entry in 'processes'.

  Gate* gate = NULL;
  Gate::state_t old;

  ProcessBase* process = NULL; // Set to non-null if we donate thread.

  // Try and approach the gate if necessary.
  synchronized (processes) {
    if (processes.count(pid.id) > 0) {
      process = processes[pid.id];
      CHECK(process->state != ProcessBase::TERMINATED);

      // Check and see if a gate already exists.
      if (gates.find(process) == gates.end()) {
        gates[process] = new Gate();
      }

      gate = gates[process];
      old = gate->approach();

      // Check if it is runnable in order to donate this thread.
      if (process->state == ProcessBase::BOTTOM ||
          process->state == ProcessBase::READY) {
        synchronized (runq) {
          list<ProcessBase*>::iterator it =
            find(runq.begin(), runq.end(), process);
          if (it != runq.end()) {
            runq.erase(it);
          } else {
            // Another thread has resumed the process ...
            process = NULL;
          }
        }
      } else {
        // Process is not runnable, so no need to donate ...
        process = NULL;
      }
    }
  }

  if (process != NULL) {
    VLOG(2) << "Donating thread to " << process->pid << " while waiting";
    ProcessBase* donator = __process__;
    __sync_fetch_and_add(&running, 1);
    process_manager->resume(process);
    __process__ = donator;
  }

  // TODO(benh): Donating only once may not be sufficient, so we might
  // still deadlock here ... perhaps warn if that's the case?

  // Now arrive at the gate and wait until it opens.
  if (gate != NULL) {
    gate->arrive(old);

    if (gate->empty()) {
      delete gate;
    }

    return true;
  }

  return false;
}


void ProcessManager::enqueue(ProcessBase* process)
{
  CHECK(process != NULL);

  // TODO(benh): Check and see if this process has it's own thread. If
  // it does, push it on that threads runq, and wake up that thread if
  // it's not running. Otherwise, check and see which thread this
  // process was last running on, and put it on that threads runq.

  synchronized (runq) {
    CHECK(find(runq.begin(), runq.end(), process) == runq.end());
    runq.push_back(process);
  }

  // Wake up the processing thread if necessary.
  gate->open();
}


ProcessBase* ProcessManager::dequeue()
{
  // TODO(benh): Remove a process from this thread's runq. If there
  // are no processes to run, and this is not a dedicated thread, then
  // steal one from another threads runq.

  ProcessBase* process = NULL;

  synchronized (runq) {
    if (!runq.empty()) {
      process = runq.front();
      runq.pop_front();
      // Increment the running count of processes in order to support
      // the Clock::settle() operation (this must be done atomically
      // with removing the process from the runq).
      __sync_fetch_and_add(&running, 1);
    }
  }

  return process;
}


void ProcessManager::settle()
{
  bool done = true;
  do {
    os::sleep(Milliseconds(10));
    done = true;
    // Hopefully this is the only place we acquire both these locks.
    synchronized (runq) {
      synchronized (timeouts) {
        CHECK(Clock::paused()); // Since another thread could resume the clock!

        if (!runq.empty()) {
          done = false;
        }

        __sync_synchronize(); // Read barrier for 'running'.
        if (running > 0) {
          done = false;
        }

        if (timeouts->size() > 0 &&
            timeouts->begin()->first <= clock::current) {
          done = false;
        }

        if (pending_timers) {
          done = false;
        }
      }
    }
  } while (!done);
}


Future<Response> ProcessManager::__processes__(const Request&)
{
  JSON::Array array;

  synchronized (processes) {
    foreachvalue (ProcessBase* process, process_manager->processes) {
      JSON::Object object;
      object.values["id"] = process->pid.id;

      JSON::Array events;

      struct JSONVisitor : EventVisitor
      {
        explicit JSONVisitor(JSON::Array* _events) : events(_events) {}

        virtual void visit(const MessageEvent& event)
        {
          JSON::Object object;
          object.values["type"] = "MESSAGE";

          const Message& message = *event.message;

          object.values["name"] = message.name;
          object.values["from"] = string(message.from);
          object.values["to"] = string(message.to);
          object.values["body"] = message.body;

          events->values.push_back(object);
        }

        virtual void visit(const HttpEvent& event)
        {
          JSON::Object object;
          object.values["type"] = "HTTP";

          const Request& request = *event.request;

          object.values["method"] = request.method;
          object.values["url"] = request.url;

          events->values.push_back(object);
        }

        virtual void visit(const DispatchEvent& event)
        {
          JSON::Object object;
          object.values["type"] = "DISPATCH";
          events->values.push_back(object);
        }

        virtual void visit(const ExitedEvent& event)
        {
          JSON::Object object;
          object.values["type"] = "EXITED";
          events->values.push_back(object);
        }

        virtual void visit(const TerminateEvent& event)
        {
          JSON::Object object;
          object.values["type"] = "TERMINATE";
          events->values.push_back(object);
        }

        JSON::Array* events;
      } visitor(&events);

      process->lock();
      {
        foreach (Event* event, process->events) {
          event->visit(&visitor);
        }
      }
      process->unlock();

      object.values["events"] = events;
      array.values.push_back(object);
    }
  }

  return OK(array);
}


Timer Timer::create(
    const Duration& duration,
    const lambda::function<void(void)>& thunk)
{
  static uint64_t id = 1; // Start at 1 since Timer() instances use id 0.

  // Assumes Clock::now() does Clock::now(__process__).
  Timeout timeout = Timeout::in(duration);

  UPID pid = __process__ != NULL ? __process__->self() : UPID();

  Timer timer(__sync_fetch_and_add(&id, 1), timeout, pid, thunk);

  VLOG(3) << "Created a timer for " << timeout.time();

  // Add the timer.
  synchronized (timeouts) {
    if (timeouts->size() == 0 ||
        timer.timeout().time() < timeouts->begin()->first) {
      // Need to interrupt the loop to update/set timer repeat.
      (*timeouts)[timer.timeout().time()].push_back(timer);
      update_timer = true;
      ev_async_send(loop, &async_watcher);
    } else {
      // Timer repeat is adequate, just add the timeout.
      CHECK(timeouts->size() >= 1);
      (*timeouts)[timer.timeout().time()].push_back(timer);
    }
  }

  return timer;
}


bool Timer::cancel(const Timer& timer)
{
  bool canceled = false;
  synchronized (timeouts) {
    // Check if the timeout is still pending, and if so, erase it. In
    // addition, erase an empty list if we just removed the last
    // timeout.
    Time time = timer.timeout().time();
    if (timeouts->count(time) > 0) {
      canceled = true;
      (*timeouts)[time].remove(timer);
      if ((*timeouts)[time].empty()) {
        timeouts->erase(time);
      }
    }
  }

  return canceled;
}


ProcessBase::ProcessBase(const string& id)
{
  process::initialize();

  state = ProcessBase::BOTTOM;

  pthread_mutexattr_t attr;
  pthread_mutexattr_init(&attr);
  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
  pthread_mutex_init(&m, &attr);
  pthread_mutexattr_destroy(&attr);

  refs = 0;

  pid.id = id != "" ? id : ID::generate();
  pid.ip = __ip__;
  pid.port = __port__;

  // If using a manual clock, try and set current time of process
  // using happens before relationship between creator and createe!
  if (Clock::paused()) {
    synchronized (timeouts) {
      if (Clock::paused()) {
        clock::currents->erase(this); // In case the address is reused!
        if (__process__ != NULL) {
          Clock::order(__process__, this);
        } else {
          Clock::update(this, Clock::now());
        }
      }
    }
  }
}


ProcessBase::~ProcessBase() {}


void ProcessBase::enqueue(Event* event, bool inject)
{
  CHECK(event != NULL);

  lock();
  {
    if (state != TERMINATING && state != TERMINATED) {
      if (!inject) {
        events.push_back(event);
      } else {
        events.push_front(event);
      }

      if (state == BLOCKED) {
        state = READY;
        process_manager->enqueue(this);
      }

      CHECK(state == BOTTOM ||
            state == READY ||
            state == RUNNING);
    } else {
      delete event;
    }
  }
  unlock();
}


void ProcessBase::inject(
    const UPID& from,
    const string& name,
    const char* data,
    size_t length)
{
  if (!from)
    return;

  Message* message = encode(from, pid, name, string(data, length));

  enqueue(new MessageEvent(message), true);
}


void ProcessBase::send(
    const UPID& to,
    const string& name,
    const char* data,
    size_t length)
{
  if (!to) {
    return;
  }

  // Encode and transport outgoing message.
  transport(encode(pid, to, name, string(data, length)), this);
}


void ProcessBase::visit(const MessageEvent& event)
{
  if (handlers.message.count(event.message->name) > 0) {
    handlers.message[event.message->name](
        event.message->from,
        event.message->body);
  } else if (delegates.count(event.message->name) > 0) {
    VLOG(1) << "Delegating message '" << event.message->name
            << "' to " << delegates[event.message->name];
    Message* message = new Message(*event.message);
    message->to = delegates[event.message->name];
    transport(message, this);
  }
}


void ProcessBase::visit(const DispatchEvent& event)
{
  (*event.f)(this);
}


void ProcessBase::visit(const HttpEvent& event)
{
  VLOG(1) << "Handling HTTP event for process '" << pid.id << "'"
          << " with path: '" << event.request->path << "'";

  CHECK(event.request->path.find('/') == 0); // See ProcessManager::handle.

  // Split the path by '/'.
  vector<string> tokens = strings::tokenize(event.request->path, "/");
  CHECK(tokens.size() >= 1);
  CHECK_EQ(pid.id, http::decode(tokens[0]).get());

  const string& name = tokens.size() > 1 ? tokens[1] : "";

  if (handlers.http.count(name) > 0) {
    // Create the promise to link with whatever gets returned, as well
    // as a future to wait for the response.
    memory::shared_ptr<Promise<Response> > promise(new Promise<Response>());

    Future<Response>* future = new Future<Response>(promise->future());

    // Get the HttpProxy pid for this socket.
    PID<HttpProxy> proxy = socket_manager->proxy(event.socket);

    // Let the HttpProxy know about this request (via the future).
    dispatch(proxy, &HttpProxy::handle, future, *event.request);

    // Now call the handler and associate the response with the promise.
    promise->associate(handlers.http[name](*event.request));
  } else if (assets.count(name) > 0) {
    OK response;
    response.type = Response::PATH;
    response.path = assets[name].path;

    // Construct the final path by appending remaining tokens.
    for (int i = 2; i < tokens.size(); i++) {
      response.path += "/" + tokens[i];
    }

    // Try and determine the Content-Type from an extension.
    Try<string> basename = os::basename(response.path);
    if (!basename.isError()) {
      size_t index = basename.get().find_last_of('.');
      if (index != string::npos) {
        string extension = basename.get().substr(index);
        if (assets[name].types.count(extension) > 0) {
          response.headers["Content-Type"] = assets[name].types[extension];
        }
      }
    }

    // TODO(benh): Use "text/plain" for assets that don't have an
    // extension or we don't have a mapping for? It might be better to
    // just let the browser guess (or do it's own default).

    // Get the HttpProxy pid for this socket.
    PID<HttpProxy> proxy = socket_manager->proxy(event.socket);

    // Enqueue the response with the HttpProxy so that it respects the
    // order of requests to account for HTTP/1.1 pipelining.
    dispatch(proxy, &HttpProxy::enqueue, response, *event.request);
  } else {
    VLOG(1) << "Returning '404 Not Found' for '" << event.request->path << "'";

    // Get the HttpProxy pid for this socket.
    PID<HttpProxy> proxy = socket_manager->proxy(event.socket);

    // Enqueue the response with the HttpProxy so that it respects the
    // order of requests to account for HTTP/1.1 pipelining.
    dispatch(proxy, &HttpProxy::enqueue, NotFound(), *event.request);
  }
}


void ProcessBase::visit(const ExitedEvent& event)
{
  exited(event.pid);
}


void ProcessBase::visit(const TerminateEvent& event)
{
  finalize();
}


UPID ProcessBase::link(const UPID& to)
{
  if (!to) {
    return to;
  }

  process_manager->link(this, to);

  return to;
}


bool ProcessBase::route(
    const string& name,
    const Option<string>& help_,
    const HttpRequestHandler& handler)
{
  if (name.find('/') != 0) {
    return false;
  }
  handlers.http[name.substr(1)] = handler;
  dispatch(help, &Help::add, pid.id, name, help_);
  return true;
}



UPID spawn(ProcessBase* process, bool manage)
{
  process::initialize();

  if (process != NULL) {
    // If using a manual clock, try and set current time of process
    // using happens before relationship between spawner and spawnee!
    if (Clock::paused()) {
      synchronized (timeouts) {
        if (Clock::paused()) {
          if (__process__ != NULL) {
            Clock::order(__process__, process);
          } else {
            Clock::update(process, Clock::now());
          }
        }
      }
    }

    return process_manager->spawn(process, manage);
  } else {
    return UPID();
  }
}


void terminate(const UPID& pid, bool inject)
{
  process_manager->terminate(pid, inject, __process__);
}


class WaitWaiter : public Process<WaitWaiter>
{
public:
  WaitWaiter(const UPID& _pid, const Duration& _duration, bool* _waited)
    : ProcessBase(ID::generate("__waiter__")),
      pid(_pid),
      duration(_duration),
      waited(_waited) {}

  virtual void initialize()
  {
    VLOG(3) << "Running waiter process for " << pid;
    link(pid);
    delay(duration, self(), &WaitWaiter::timeout);
  }

private:
  virtual void exited(const UPID&)
  {
    VLOG(3) << "Waiter process waited for " << pid;
    *waited = true;
    terminate(self());
  }

  void timeout()
  {
    VLOG(3) << "Waiter process timed out waiting for " << pid;
    *waited = false;
    terminate(self());
  }

private:
  const UPID pid;
  const Duration duration;
  bool* const waited;
};


bool wait(const UPID& pid, const Duration& duration)
{
  process::initialize();

  if (!pid) {
    return false;
  }

  // This could result in a deadlock if some code decides to wait on a
  // process that has invoked that code!
  if (__process__ != NULL && __process__->self() == pid) {
    std::cerr << "\n**** DEADLOCK DETECTED! ****\nYou are waiting on process "
              << pid << " that it is currently executing." << std::endl;
  }

  if (duration == Seconds(-1)) {
    return process_manager->wait(pid);
  }

  bool waited = false;

  WaitWaiter waiter(pid, duration, &waited);
  spawn(waiter);
  wait(waiter);

  return waited;
}


void filter(Filter *filter)
{
  process::initialize();

  synchronized (filterer) {
    filterer = filter;
  }
}


void post(const UPID& to, const string& name, const char* data, size_t length)
{
  process::initialize();

  if (!to) {
    return;
  }

  // Encode and transport outgoing message.
  transport(encode(UPID(), to, name, string(data, length)));
}


void post(const UPID& from,
          const UPID& to,
          const string& name,
          const char* data,
          size_t length)
{
  process::initialize();

  if (!to) {
    return;
  }

  // Encode and transport outgoing message.
  transport(encode(from, to, name, string(data, length)));
}


namespace io {

namespace internal {

// Helper/continuation of 'poll' on future discard.
void _poll(const memory::shared_ptr<ev_async>& async)
{
  ev_async_send(loop, async.get());
}


Future<short> poll(int fd, short events)
{
  Poll* poll = new Poll();

  // Have the watchers data point back to the struct.
  poll->watcher.async->data = poll;
  poll->watcher.io->data = poll;

  // Get a copy of the future to avoid any races with the event loop.
  Future<short> future = poll->promise.future();

  // Initialize and start the async watcher.
  ev_async_init(poll->watcher.async.get(), discard_poll);
  ev_async_start(loop, poll->watcher.async.get());

  // Make sure we stop polling if a discard occurs on our future.
  // Note that it's possible that we'll invoke '_poll' when someone
  // does a discard even after the polling has already completed, but
  // in this case while we will interrupt the event loop since the
  // async watcher has already been stopped we won't cause
  // 'discard_poll' to get invoked.
  future.onDiscard(lambda::bind(&_poll, poll->watcher.async));

  // Initialize and start the I/O watcher.
  ev_io_init(poll->watcher.io.get(), polled, fd, events);
  ev_io_start(loop, poll->watcher.io.get());

  return future;
}


void read(
    int fd,
    void* data,
    size_t size,
    const memory::shared_ptr<Promise<size_t> >& promise,
    const Future<short>& future)
{
  // Ignore this function if the read operation has been discarded.
  if (promise->future().hasDiscard()) {
    CHECK(!future.isPending());
    promise->discard();
    return;
  }

  if (size == 0) {
    promise->set(0);
    return;
  }

  if (future.isDiscarded()) {
    promise->fail("Failed to poll: discarded future");
  } else if (future.isFailed()) {
    promise->fail(future.failure());
  } else {
    ssize_t length = ::read(fd, data, size);
    if (length < 0) {
      if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
        // Restart the read operation.
        Future<short> future =
          io::poll(fd, process::io::READ).onAny(
              lambda::bind(&internal::read,
                           fd,
                           data,
                           size,
                           promise,
                           lambda::_1));

        // Stop polling if a discard occurs on our future.
        promise->future().onDiscard(
            lambda::bind(&process::internal::discard<short>,
                         WeakFuture<short>(future)));
      } else {
        // Error occurred.
        promise->fail(strerror(errno));
      }
    } else {
      promise->set(length);
    }
  }
}


void write(
    int fd,
    void* data,
    size_t size,
    const memory::shared_ptr<Promise<size_t> >& promise,
    const Future<short>& future)
{
  // Ignore this function if the write operation has been discarded.
  if (promise->future().hasDiscard()) {
    promise->discard();
    return;
  }

  if (size == 0) {
    promise->set(0);
    return;
  }

  if (future.isDiscarded()) {
    promise->fail("Failed to poll: discarded future");
  } else if (future.isFailed()) {
    promise->fail(future.failure());
  } else {
    // Do a write but ignore SIGPIPE so we can return an error when
    // writing to a pipe or socket where the reading end is closed.
    // TODO(benh): The 'suppress' macro failed to work on OS X as it
    // appears that signal delivery was happening asynchronously.
    // That is, the signal would not appear to be pending when the
    // 'suppress' block was closed thus the destructor for
    // 'Suppressor' was not waiting/removing the signal via 'sigwait'.
    // It also appeared that the signal would be delivered to another
    // thread even if it remained blocked in this thiread. The
    // workaround here is to check explicitly for EPIPE and then do
    // 'sigwait' regardless of what 'os::signals::pending' returns. We
    // don't have that luxury with 'Suppressor' and arbitrary signals
    // because we don't always have something like EPIPE to tell us
    // that a signal is (or will soon be) pending.
    bool pending = os::signals::pending(SIGPIPE);
    bool unblock = !pending ? os::signals::block(SIGPIPE) : false;

    ssize_t length = ::write(fd, data, size);

    // Save the errno so we can restore it after doing sig* functions
    // below.
    int errno_ = errno;

    if (length < 0 && errno == EPIPE && !pending) {
      sigset_t mask;
      sigemptyset(&mask);
      sigaddset(&mask, SIGPIPE);

      int result;
      do {
        int ignored;
        result = sigwait(&mask, &ignored);
      } while (result == -1 && errno == EINTR);
    }

    if (unblock) {
      os::signals::unblock(SIGPIPE);
    }

    errno = errno_;

    if (length < 0) {
      if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
        // Restart the write operation.
        Future<short> future =
          io::poll(fd, process::io::WRITE).onAny(
              lambda::bind(&internal::write,
                           fd,
                           data,
                           size,
                           promise,
                           lambda::_1));

        // Stop polling if a discard occurs on our future.
        promise->future().onDiscard(
            lambda::bind(&process::internal::discard<short>,
                         WeakFuture<short>(future)));
      } else {
        // Error occurred.
        promise->fail(strerror(errno));
      }
    } else {
      // TODO(benh): Retry if 'length' is 0?
      promise->set(length);
    }
  }
}

} // namespace internal {


Future<short> poll(int fd, short events)
{
  process::initialize();

  // TODO(benh): Check if the file descriptor is non-blocking?

  return run_in_event_loop<short>(lambda::bind(&internal::poll, fd, events));
}


Future<size_t> read(int fd, void* data, size_t size)
{
  process::initialize();

  memory::shared_ptr<Promise<size_t> > promise(new Promise<size_t>());

  // Check the file descriptor.
  Try<bool> nonblock = os::isNonblock(fd);
  if (nonblock.isError()) {
    // The file descriptor is not valid (e.g., has been closed).
    promise->fail(
        "Failed to check if file descriptor was non-blocking: " +
        nonblock.error());
    return promise->future();
  } else if (!nonblock.get()) {
    // The file descriptor is not non-blocking.
    promise->fail("Expected a non-blocking file descriptor");
    return promise->future();
  }

  // Because the file descriptor is non-blocking, we call read()
  // immediately. The read may in turn call poll if necessary,
  // avoiding unnecessary polling. We also observed that for some
  // combination of libev and Linux kernel versions, the poll would
  // block for non-deterministically long periods of time. This may be
  // fixed in a newer version of libev (we use 3.8 at the time of
  // writing this comment).
  internal::read(fd, data, size, promise, io::READ);

  return promise->future();
}


Future<size_t> write(int fd, void* data, size_t size)
{
  process::initialize();

  memory::shared_ptr<Promise<size_t> > promise(new Promise<size_t>());

  // Check the file descriptor.
  Try<bool> nonblock = os::isNonblock(fd);
  if (nonblock.isError()) {
    // The file descriptor is not valid (e.g., has been closed).
    promise->fail(
        "Failed to check if file descriptor was non-blocking: " +
        nonblock.error());
    return promise->future();
  } else if (!nonblock.get()) {
    // The file descriptor is not non-blocking.
    promise->fail("Expected a non-blocking file descriptor");
    return promise->future();
  }

  // Because the file descriptor is non-blocking, we call write()
  // immediately. The write may in turn call poll if necessary,
  // avoiding unnecessary polling. We also observed that for some
  // combination of libev and Linux kernel versions, the poll would
  // block for non-deterministically long periods of time. This may be
  // fixed in a newer version of libev (we use 3.8 at the time of
  // writing this comment).
  internal::write(fd, data, size, promise, io::WRITE);

  return promise->future();
}


namespace internal {

#if __cplusplus >= 201103L
Future<string> _read(
    int fd,
    const memory::shared_ptr<string>& buffer,
    const boost::shared_array<char>& data,
    size_t length)
{
  return io::read(fd, data.get(), length)
    .then([=] (size_t size) -> Future<string> {
      if (size == 0) { // EOF.
        return string(*buffer);
      }
      buffer->append(data.get(), size);
      return _read(fd, buffer, data, length);
    });
}
#else
// Forward declataion.
Future<string> _read(
    int fd,
    const memory::shared_ptr<string>& buffer,
    const boost::shared_array<char>& data,
    size_t length);


Future<string> __read(
    size_t size,
    int fd,
    const memory::shared_ptr<string>& buffer,
    const boost::shared_array<char>& data,
    size_t length)
{
  if (size == 0) { // EOF.
    return string(*buffer);
  }

  buffer->append(data.get(), size);

  return _read(fd, buffer, data, length);
}


Future<string> _read(
    int fd,
    const memory::shared_ptr<string>& buffer,
    const boost::shared_array<char>& data,
    size_t length)
{
  return io::read(fd, data.get(), length)
    .then(lambda::bind(&__read, lambda::_1, fd, buffer, data, length));
}
#endif // __cplusplus >= 201103L


#if __cplusplus >= 201103L
Future<Nothing> _write(
    int fd,
    Owned<string> data,
    size_t index)
{
  return io::write(fd, (void*) (data->data() + index), data->size() - index)
    .then([=] (size_t length) -> Future<Nothing> {
      if (index + length == data->size()) {
        return Nothing();
      }
      return _write(fd, data, index + length);
    });
}
#else
// Forward declaration.
Future<Nothing> _write(
    int fd,
    Owned<string> data,
    size_t index);


Future<Nothing> __write(
    int fd,
    Owned<string> data,
    size_t index,
    size_t length)
{
  if (index + length == data->size()) {
    return Nothing();
  }
  return _write(fd, data, index + length);
}


Future<Nothing> _write(
    int fd,
    Owned<string> data,
    size_t index)
{
  return io::write(fd, (void*) (data->data() + index), data->size() - index)
    .then(lambda::bind(&__write, fd, data, index, lambda::_1));
}
#endif // __cplusplus >= 201103L


#if __cplusplus >= 201103L
void _splice(
    int from,
    int to,
    size_t chunk,
    boost::shared_array<char> data,
    memory::shared_ptr<Promise<Nothing>> promise)
{
  // Stop splicing if a discard occured on our future.
  if (promise->future().hasDiscard()) {
    // TODO(benh): Consider returning the number of bytes already
    // spliced on discarded, or a failure. Same for the 'onDiscarded'
    // callbacks below.
    promise->discard();
    return;
  }

  // Note that only one of io::read or io::write is outstanding at any
  // one point in time thus the reuse of 'data' for both operations.

  Future<size_t> read = io::read(from, data.get(), chunk);

  // Stop reading (or potentially indefinitely polling) if a discard
  // occcurs on our future.
  promise->future().onDiscard(
      lambda::bind(&process::internal::discard<size_t>,
                   WeakFuture<size_t>(read)));

  read
    .onReady([=] (size_t size) {
      if (size == 0) { // EOF.
        promise->set(Nothing());
      } else {
        // Note that we always try and complete the write, even if a
        // discard has occured on our future, in order to provide
        // semantics where everything read is written. The promise
        // will eventually be discarded in the next read.
        io::write(to, string(data.get(), size))
          .onReady([=] () { _splice(from, to, chunk, data, promise); })
          .onFailed([=] (const string& message) { promise->fail(message); })
          .onDiscarded([=] () { promise->discard(); });
      }
    })
    .onFailed([=] (const string& message) { promise->fail(message); })
    .onDiscarded([=] () { promise->discard(); });
}
#else
// Forward declarations.
void __splice(
    int from,
    int to,
    size_t chunk,
    boost::shared_array<char> data,
    memory::shared_ptr<Promise<Nothing> > promise,
    size_t size);

void ___splice(
    memory::shared_ptr<Promise<Nothing> > promise,
    const string& message);

void ____splice(
    memory::shared_ptr<Promise<Nothing> > promise);


void _splice(
    int from,
    int to,
    size_t chunk,
    boost::shared_array<char> data,
    memory::shared_ptr<Promise<Nothing> > promise)
{
  // Stop splicing if a discard occured on our future.
  if (promise->future().hasDiscard()) {
    // TODO(benh): Consider returning the number of bytes already
    // spliced on discarded, or a failure. Same for the 'onDiscarded'
    // callbacks below.
    promise->discard();
    return;
  }

  Future<size_t> read = io::read(from, data.get(), chunk);

  // Stop reading (or potentially indefinitely polling) if a discard
  // occurs on our future.
  promise->future().onDiscard(
      lambda::bind(&process::internal::discard<size_t>,
                   WeakFuture<size_t>(read)));

  read
    .onReady(
        lambda::bind(&__splice, from, to, chunk, data, promise, lambda::_1))
    .onFailed(lambda::bind(&___splice, promise, lambda::_1))
    .onDiscarded(lambda::bind(&____splice, promise));
}


void __splice(
    int from,
    int to,
    size_t chunk,
    boost::shared_array<char> data,
    memory::shared_ptr<Promise<Nothing> > promise,
    size_t size)
{
  if (size == 0) { // EOF.
    promise->set(Nothing());
  } else {
    // Note that we always try and complete the write, even if a
    // discard has occured on our future, in order to provide
    // semantics where everything read is written. The promise will
    // eventually be discarded in the next read.
    io::write(to, string(data.get(), size))
      .onReady(lambda::bind(&_splice, from, to, chunk, data, promise))
      .onFailed(lambda::bind(&___splice, promise, lambda::_1))
      .onDiscarded(lambda::bind(&____splice, promise));
  }
}


void ___splice(
    memory::shared_ptr<Promise<Nothing> > promise,
    const string& message)
{
  promise->fail(message);
}


void ____splice(
    memory::shared_ptr<Promise<Nothing> > promise)
{
  promise->discard();
}
#endif // __cplusplus >= 201103L


Future<Nothing> splice(int from, int to, size_t chunk)
{
  boost::shared_array<char> data(new char[chunk]);

  // Rather than having internal::_splice return a future and
  // implementing internal::_splice as a chain of io::read and
  // io::write calls, we use an explicit promise that we pass around
  // so that we don't increase memory usage the longer that we splice.
  memory::shared_ptr<Promise<Nothing> > promise(new Promise<Nothing>());

  Future<Nothing> future = promise->future();

  _splice(from, to, chunk, data, promise);

  return future;
}

} // namespace internal {


Future<string> read(int fd)
{
  process::initialize();

  // Get our own copy of the file descriptor so that we're in control
  // of the lifetime and don't crash if/when someone by accidently
  // closes the file descriptor before discarding this future. We can
  // also make sure it's non-blocking and will close-on-exec. Start by
  // checking we've got a "valid" file descriptor before dup'ing.
  if (fd < 0) {
    return Failure(strerror(EBADF));
  }

  fd = dup(fd);
  if (fd == -1) {
    return Failure(ErrnoError("Failed to duplicate file descriptor"));
  }

  // Set the close-on-exec flag.
  Try<Nothing> cloexec = os::cloexec(fd);
  if (cloexec.isError()) {
    os::close(fd);
    return Failure(
        "Failed to set close-on-exec on duplicated file descriptor: " +
        cloexec.error());
  }

  // Make the file descriptor is non-blocking.
  Try<Nothing> nonblock = os::nonblock(fd);
  if (nonblock.isError()) {
    os::close(fd);
    return Failure(
        "Failed to make duplicated file descriptor non-blocking: " +
        nonblock.error());
  }

  // TODO(benh): Wrap up this data as a struct, use 'Owner'.
  // TODO(bmahler): For efficiency, use a rope for the buffer.
  memory::shared_ptr<string> buffer(new string());
  boost::shared_array<char> data(new char[BUFFERED_READ_SIZE]);

  return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE)
    .onAny(lambda::bind(&os::close, fd));
}


Future<Nothing> write(int fd, const std::string& data)
{
  process::initialize();

  // Get our own copy of the file descriptor so that we're in control
  // of the lifetime and don't crash if/when someone by accidently
  // closes the file descriptor before discarding this future. We can
  // also make sure it's non-blocking and will close-on-exec. Start by
  // checking we've got a "valid" file descriptor before dup'ing.
  if (fd < 0) {
    return Failure(strerror(EBADF));
  }

  fd = dup(fd);
  if (fd == -1) {
    return Failure(ErrnoError("Failed to duplicate file descriptor"));
  }

  // Set the close-on-exec flag.
  Try<Nothing> cloexec = os::cloexec(fd);
  if (cloexec.isError()) {
    os::close(fd);
    return Failure(
        "Failed to set close-on-exec on duplicated file descriptor: " +
        cloexec.error());
  }

  // Make the file descriptor is non-blocking.
  Try<Nothing> nonblock = os::nonblock(fd);
  if (nonblock.isError()) {
    os::close(fd);
    return Failure(
        "Failed to make duplicated file descriptor non-blocking: " +
        nonblock.error());
  }

  return internal::_write(fd, Owned<string>(new string(data)), 0)
    .onAny(lambda::bind(&os::close, fd));
}


Future<Nothing> redirect(int from, Option<int> to, size_t chunk)
{
  // Make sure we've got "valid" file descriptors.
  if (from < 0 || (to.isSome() && to.get() < 0)) {
    return Failure(strerror(EBADF));
  }

  if (to.isNone()) {
    // Open up /dev/null that we can splice into.
    Try<int> open = os::open("/dev/null", O_WRONLY);

    if (open.isError()) {
      return Failure("Failed to open /dev/null for writing: " + open.error());
    }

    to = open.get();
  } else {
    // Duplicate 'to' so that we're in control of its lifetime.
    int fd = dup(to.get());
    if (fd == -1) {
      return Failure(ErrnoError("Failed to duplicate 'to' file descriptor"));
    }

    to = fd;
  }

  CHECK_SOME(to);

  // Duplicate 'from' so that we're in control of its lifetime.
  from = dup(from);
  if (from == -1) {
    return Failure(ErrnoError("Failed to duplicate 'from' file descriptor"));
  }

  // Set the close-on-exec flag (no-op if already set).
  Try<Nothing> cloexec = os::cloexec(from);
  if (cloexec.isError()) {
    os::close(from);
    os::close(to.get());
    return Failure("Failed to set close-on-exec on 'from': " + cloexec.error());
  }

  cloexec = os::cloexec(to.get());
  if (cloexec.isError()) {
    os::close(from);
    os::close(to.get());
    return Failure("Failed to set close-on-exec on 'to': " + cloexec.error());
  }

  // Make the file descriptors non-blocking (no-op if already set).
  Try<Nothing> nonblock = os::nonblock(from);
  if (nonblock.isError()) {
    os::close(from);
    os::close(to.get());
    return Failure("Failed to make 'from' non-blocking: " + nonblock.error());
  }

  nonblock = os::nonblock(to.get());
  if (nonblock.isError()) {
    os::close(from);
    os::close(to.get());
    return Failure("Failed to make 'to' non-blocking: " + nonblock.error());
  }

  return internal::splice(from, to.get(), chunk)
    .onAny(lambda::bind(&os::close, from))
    .onAny(lambda::bind(&os::close, to.get()));
}

} // namespace io {


namespace inject {

bool exited(const UPID& from, const UPID& to)
{
  process::initialize();

  ExitedEvent* event = new ExitedEvent(from);
  return process_manager->deliver(to, event, __process__);
}

} // namespace inject {


namespace internal {

void dispatch(
    const UPID& pid,
    const memory::shared_ptr<lambda::function<void(ProcessBase*)> >& f,
    const string& method)
{
  process::initialize();

  DispatchEvent* event = new DispatchEvent(pid, f, method);
  process_manager->deliver(pid, event, __process__);
}

} // namespace internal {
} // namespace process {
