| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License |
| |
| #ifndef __PROCESS_PROCESS_HPP__ |
| #define __PROCESS_PROCESS_HPP__ |
| |
| #include <stdint.h> |
| |
| #include <memory> |
| #include <map> |
| #include <queue> |
| #include <vector> |
| |
| #include <process/address.hpp> |
| #include <process/authenticator.hpp> |
| #include <process/clock.hpp> |
| #include <process/event.hpp> |
| #include <process/filter.hpp> |
| #include <process/firewall.hpp> |
| #include <process/http.hpp> |
| #include <process/message.hpp> |
| #include <process/mime.hpp> |
| #include <process/owned.hpp> |
| #include <process/pid.hpp> |
| |
| #include <stout/duration.hpp> |
| #include <stout/hashmap.hpp> |
| #include <stout/lambda.hpp> |
| #include <stout/option.hpp> |
| #include <stout/synchronized.hpp> |
| |
| namespace process { |
| |
| // Forward declaration. |
| class EventQueue; |
| class Gate; |
| class Logging; |
| class Sequence; |
| |
| namespace firewall { |
| |
| /** |
| * Install a list of firewall rules which are used to forbid incoming |
| * HTTP requests. |
| * |
| * The rules will be applied in the provided order to each incoming |
| * request. If any rule forbids the request, no more rules are applied |
| * and a "403 Forbidden" response will be returned containing the reason |
| * from the rule. |
| * |
| * **NOTE**: if a request is forbidden, the request's handler is |
| * not notified. |
| * |
| * @see process::firewall::FirewallRule |
| * |
| * @param rules List of rules which will be applied to all incoming |
| * HTTP requests. |
| */ |
| void install(std::vector<Owned<FirewallRule>>&& rules); |
| |
| } // namespace firewall { |
| |
| class ProcessBase : public EventConsumer |
| { |
| public: |
| explicit ProcessBase(const std::string& id = ""); |
| |
| ~ProcessBase() override; |
| |
| const UPID& self() const { return pid; } |
| |
| protected: |
| /** |
| * Invoked when an event is serviced. |
| */ |
| virtual void serve(Event&& event) |
| { |
| std::move(event).consume(this); |
| } |
| |
| // Callbacks used to consume (i.e., handle) a specific event. |
| void consume(MessageEvent&& event) override; |
| void consume(DispatchEvent&& event) override; |
| void consume(HttpEvent&& event) override; |
| void consume(ExitedEvent&& event) override; |
| void consume(TerminateEvent&& event) override; |
| |
| /** |
| * Invoked when a process gets spawned. |
| */ |
| virtual void initialize() {} |
| |
| /** |
| * Invoked when a process is terminated. |
| * |
| * **NOTE**: this does not get invoked automatically if |
| * `process::ProcessBase::consume(TerminateEvent&&)` is overridden. |
| */ |
| virtual void finalize() {} |
| |
| /** |
| * Invoked when a linked process has exited. |
| * |
| * For local linked processes (i.e., when the linker and linkee are |
| * part of the same OS process), this can be used to reliably detect |
| * when the linked process has exited. |
| * |
| * For remote linked processes, this indicates that the persistent |
| * TCP connection between the linker and the linkee has failed |
| * (e.g., linkee process died, a network error occurred). In this |
| * situation, the remote linkee process might still be running. |
| * |
| * @see process::ProcessBase::link |
| */ |
| virtual void exited(const UPID&) {} |
| |
| /** |
| * Invoked when a linked process can no longer be monitored. |
| * |
| * TODO(neilc): This is not implemented. |
| * |
| * @see process::ProcessBase::link |
| */ |
| virtual void lost(const UPID&) {} |
| |
| /** |
| * Sends the message to the specified `UPID`. Prefer the rvalue |
| * reference overloads if the data can be moved in. |
| * |
| * @see process::Message |
| */ |
| void send( |
| const UPID& to, |
| const std::string& name, |
| const char* data = nullptr, |
| size_t length = 0); |
| |
| void send( |
| const UPID& to, |
| std::string&& name); |
| |
| void send( |
| const UPID& to, |
| std::string&& name, |
| std::string&& data); |
| |
| /** |
| * Describes the behavior of the `link` call when the target `pid` |
| * points to a remote process. This enum has no effect if the target |
| * `pid` points to a local process. |
| */ |
| enum class RemoteConnection |
| { |
| /** |
| * If a persistent socket to the target `pid` does not exist, |
| * a new link is created. If a persistent socket already exists, |
| * `link` will subscribe this process to the existing link. |
| * |
| * This is the default behavior. |
| */ |
| REUSE, |
| |
| /** |
| * If a persistent socket to the target `pid` does not exist, |
| * a new link is created. If a persistent socket already exists, |
| * `link` create a new socket connection with the target `pid` |
| * and *atomically* swap the existing link with the new link. |
| * |
| * Existing linkers will remain linked, albeit via the new socket. |
| */ |
| RECONNECT, |
| }; |
| |
| /** |
| * Links with the specified `UPID`. |
| * |
| * Linking with a process from within the same OS process is |
| * guaranteed to give you perfect monitoring of that process. |
| * |
| * Linking to a remote process establishes a persistent TCP |
| * connection to the remote libprocess instance that hosts that |
| * process. If the TCP connection fails, the true state of the |
| * remote linked process cannot be determined; we handle this |
| * situation by generating an ExitedEvent. |
| */ |
| UPID link( |
| const UPID& pid, |
| const RemoteConnection remote = RemoteConnection::REUSE); |
| |
| /** |
| * Any function which takes a "from" `UPID` and a message body as |
| * arguments. |
| * |
| * The default consume implementation for message events invokes |
| * installed message handlers, or delegates the message to another |
| * process. A message handler always takes precedence over delegating. |
| * |
| * @see process::ProcessBase::install |
| * @see process::ProcessBase::delegate |
| */ |
| typedef lambda::function<void(const UPID&, const std::string&)> |
| MessageHandler; |
| |
| /** |
| * Sets up a handler for messages with the specified name. |
| */ |
| void install( |
| const std::string& name, |
| const MessageHandler& handler) |
| { |
| handlers.message[name] = handler; |
| } |
| |
| /** |
| * @copydoc process::ProcessBase::install |
| */ |
| template <typename T> |
| void install( |
| const std::string& name, |
| void (T::*method)(const UPID&, const std::string&)) |
| { |
| // Note that we use dynamic_cast here so a process can use |
| // multiple inheritance if it sees so fit (e.g., to implement |
| // multiple callback interfaces). |
| MessageHandler handler = |
| lambda::bind(method, dynamic_cast<T*>(this), lambda::_1, lambda::_2); |
| install(name, handler); |
| } |
| |
| /** |
| * Delegates incoming messages, with the specified name, to the `UPID`. |
| */ |
| void delegate(const std::string& name, const UPID& pid) |
| { |
| delegates[name] = pid; |
| } |
| |
| /** |
| * Any function which takes a `process::http::Request` and returns a |
| * `process::http::Response`. |
| * |
| * The default consume implementation for HTTP events invokes |
| * installed HTTP handlers. |
| * |
| * @see process::ProcessBase::route |
| */ |
| typedef lambda::function<Future<http::Response>(const http::Request&)> |
| HttpRequestHandler; |
| |
| // Options to control the behavior of a route. |
| struct RouteOptions |
| { |
| RouteOptions() |
| : requestStreaming(false) {} |
| |
| // Set to true if the endpoint supports request streaming. |
| // Default: false. |
| bool requestStreaming; |
| }; |
| |
| /** |
| * Sets up a handler for HTTP requests with the specified name. |
| * |
| * @param name The endpoint or URL to route. |
| * Must begin with a `/` and must not end with a '/'. |
| */ |
| void route( |
| const std::string& name, |
| const Option<std::string>& help, |
| const HttpRequestHandler& handler, |
| const RouteOptions& options = RouteOptions()); |
| |
| /** |
| * @copydoc process::ProcessBase::route |
| */ |
| template <typename T> |
| void route( |
| const std::string& name, |
| const Option<std::string>& help, |
| Future<http::Response> (T::*method)(const http::Request&), |
| const RouteOptions& options = RouteOptions()) |
| { |
| // Note that we use dynamic_cast here so a process can use |
| // multiple inheritance if it sees so fit (e.g., to implement |
| // multiple callback interfaces). |
| HttpRequestHandler handler = |
| lambda::bind(method, dynamic_cast<T*>(this), lambda::_1); |
| route(name, help, handler, options); |
| } |
| |
| /** |
| * Any function which takes a `process::http::Request` and an |
| * `Option<Principal>` and returns a `process::http::Response`. |
| * This type is meant to be used for the endpoint handlers of |
| * authenticated HTTP endpoints. |
| * |
| * If the handler is called and the principal is set, |
| * this implies two things: |
| * 1) The realm that the handler's endpoint is installed into |
| * requires authentication. |
| * 2) The HTTP request has been successfully authenticated. |
| * |
| * If the principal is not set, then the endpoint's |
| * realm does not require authentication. |
| * |
| * The default consume implementation for HTTP events invokes |
| * installed HTTP handlers. |
| * |
| * @see process::ProcessBase::route |
| */ |
| typedef lambda::function<Future<http::Response>( |
| const http::Request&, |
| const Option<http::authentication::Principal>&)> |
| AuthenticatedHttpRequestHandler; |
| |
| // TODO(arojas): Consider introducing an `authentication::Realm` type. |
| // TODO(bevers): Consider changing the type of the second argument to |
| // `const Option<std::string>&` for consistency with the version below. |
| void route( |
| const std::string& name, |
| const std::string& realm, |
| const Option<std::string>& help, |
| const AuthenticatedHttpRequestHandler& handler, |
| const RouteOptions& options = RouteOptions()); |
| |
| /** |
| * Forwards to the correct overload of `process::ProcessBase::route()`, |
| * depending on whether the authentication realm `realm` is present. |
| */ |
| template<typename T> |
| void route( |
| const std::string& name, |
| const Option<std::string>& realm, |
| const Option<std::string>& help, |
| Future<http::Response>(T::*method)( |
| const http::Request&, const Option<http::authentication::Principal>&), |
| const RouteOptions& options = RouteOptions()) |
| { |
| // Note that we use dynamic_cast here so a process can use |
| // multiple inheritance if it sees so fit (e.g., to implement |
| // multiple callback interfaces). |
| if (realm.isSome()) { |
| AuthenticatedHttpRequestHandler handler = |
| lambda::bind(method, dynamic_cast<T*>(this), lambda::_1, lambda::_2); |
| route(name, realm.get(), help, handler, options); |
| } else { |
| HttpRequestHandler handler = |
| lambda::bind(method, dynamic_cast<T*>(this), lambda::_1, None()); |
| route(name, help, handler, options); |
| } |
| } |
| |
| /** |
| * Sets up the default HTTP request handler to provide the static |
| * asset(s) at the specified _absolute_ path for the specified name. |
| * |
| * For example, assuming the process named "server" invoked |
| * `provide("name", "path")`, then an HTTP request for `/server/name` |
| * would return the asset found at "path". If the specified path is a |
| * directory then an HTTP request for `/server/name/file` would return |
| * the asset found at `/path/file`. |
| * |
| * The `Content-Type` header of the HTTP response will be set to the |
| * specified type given the file extension, which can be changed via |
| * the optional `types` parameter. |
| */ |
| void provide( |
| const std::string& name, |
| const std::string& path, |
| const std::map<std::string, std::string>& types = mime::types) |
| { |
| // TODO(benh): Check that name is only alphanumeric (i.e., has no |
| // '/') and that path is absolute. |
| Asset asset; |
| asset.path = path; |
| asset.types = types; |
| assets[name] = asset; |
| } |
| |
| /** |
| * Returns the number of events of the given type currently on the |
| * event queue. MUST be invoked from within the process itself in |
| * order to safely examine events. |
| */ |
| template <typename T> |
| size_t eventCount(); |
| |
| private: |
| friend class SocketManager; |
| friend class ProcessManager; |
| friend void* schedule(void*); |
| |
| // Process states. |
| // |
| // Transitioning from BLOCKED to READY also requires enqueueing the |
| // process in the run queue otherwise the events will never be |
| // processed! |
| enum class State |
| { |
| BOTTOM, // Uninitialized but events may be enqueued. |
| BLOCKED, // Initialized, no events enqueued. |
| READY, // Initialized, events enqueued. |
| TERMINATING // Initialized, no more events will be enqueued. |
| }; |
| |
| std::atomic<State> state = ATOMIC_VAR_INIT(State::BOTTOM); |
| |
| // Flag for indicating that a terminate event has been injected. |
| std::atomic<bool> termination = ATOMIC_VAR_INIT(false); |
| |
| // Enqueue the specified message, request, or function call. |
| // Returns false if not enqueued (i.e. the process is terminating). |
| // In this case the caller retains ownership of the event. |
| // Should not be called directly, callers should go through |
| // `ProcessManager::deliver(...)`. |
| bool enqueue(Event* event); |
| |
| // Delegates for messages. |
| std::map<std::string, UPID> delegates; |
| |
| // Definition of an HTTP endpoint. The endpoint can be |
| // associated with an authentication realm, in which case: |
| // |
| // (1) `realm` and `authenticatedHandler` will be set. |
| // Libprocess will perform HTTP authentication for |
| // all requests to this endpoint (by default during |
| // HttpEvent consumption). The authentication principal |
| // will be passed to the `authenticatedHandler`. |
| // |
| // Otherwise, if the endpoint is not associated with an |
| // authentication realm: |
| // |
| // (2) Only `handler` will be set, and no authentication |
| // takes place. |
| struct HttpEndpoint |
| { |
| Option<HttpRequestHandler> handler; |
| |
| Option<std::string> realm; |
| Option<AuthenticatedHttpRequestHandler> authenticatedHandler; |
| RouteOptions options; |
| }; |
| |
| // Handlers for messages and HTTP requests. |
| struct { |
| hashmap<std::string, MessageHandler> message; |
| hashmap<std::string, HttpEndpoint> http; |
| |
| // Used for delivering HTTP requests in the correct order. |
| // Initialized lazily to avoid ProcessBase requiring |
| // another Process! |
| Owned<Sequence> httpSequence; |
| } handlers; |
| |
| // Definition of a static asset. |
| struct Asset |
| { |
| std::string path; |
| std::map<std::string, std::string> types; |
| }; |
| |
| // Continuation for `consume(HttpEvent&&)`. |
| Future<http::Response> _consume( |
| const HttpEndpoint& endpoint, |
| const std::string& name, |
| const Owned<http::Request>& request); |
| |
| // JSON representation of process. MUST be invoked from within the |
| // process itself in order to safely examine events. |
| operator JSON::Object(); |
| |
| // Static assets(s) to provide. |
| std::map<std::string, Asset> assets; |
| |
| // Queue of received events. We employ the PIMPL idiom here and use |
| // a pointer so we can hide the implementation of `EventQueue`. |
| std::unique_ptr<EventQueue> events; |
| |
| // NOTE: this is a shared pointer to a _pointer_, hence this is not |
| // responsible for the ProcessBase itself. |
| std::shared_ptr<ProcessBase*> reference; |
| |
| std::shared_ptr<Gate> gate; |
| |
| // Whether or not the runtime should delete this process after it |
| // has terminated. Note that failure to spawn the process will leave |
| // the process unmanaged and thus it may leak! |
| bool manage = false; |
| |
| // Process PID. |
| UPID pid; |
| }; |
| |
| |
| template <typename T> |
| class Process : public virtual ProcessBase { |
| public: |
| ~Process() override {} |
| |
| /** |
| * Returns the `PID` of the process. |
| * |
| * Valid even before calling spawn. |
| */ |
| PID<T> self() const { return PID<T>(static_cast<const T*>(this)); } |
| |
| protected: |
| // Useful typedefs for dispatch/delay/defer to self()/this. |
| typedef T Self; |
| typedef T This; |
| }; |
| |
| |
| /** |
| * Initialize the library. |
| * |
| * **NOTE**: `libprocess` uses Google's `glog` and you can specify options |
| * for it (e.g., a logging directory) via environment variables. |
| * |
| * @param delegate Process to receive root HTTP requests. |
| * @param readwriteAuthenticationRealm The authentication realm that read-write |
| * libprocess-level HTTP endpoints will be installed under, if any. |
| * If this realm is not specified, read-write endpoints will be installed |
| * without authentication. |
| * @param readonlyAuthenticationRealm The authentication realm that read-only |
| * libprocess-level HTTP endpoints will be installed under, if any. |
| * If this realm is not specified, read-only endpoints will be installed |
| * without authentication. |
| * @return `true` if this was the first invocation of `process::initialize()`, |
| * or `false` if it was not the first invocation. |
| * |
| * @see [glog](https://google-glog.googlecode.com/svn/trunk/doc/glog.html) |
| */ |
| bool initialize( |
| const Option<std::string>& delegate = None(), |
| const Option<std::string>& readwriteAuthenticationRealm = None(), |
| const Option<std::string>& readonlyAuthenticationRealm = None()); |
| |
| |
| /** |
| * Clean up the library. |
| * |
| * @param finalize_wsa Whether the Windows socket stack should be cleaned |
| * up for the entire process. Has no effect outside of Windows. |
| */ |
| void finalize(bool finalize_wsa = false); |
| |
| |
| /** |
| * Get the request absolutePath path with delegate prefix. |
| */ |
| std::string absolutePath(const std::string& path); |
| |
| |
| /** |
| * Returns the socket address associated with this instance of the library. |
| */ |
| network::inet::Address address(); |
| |
| |
| /** |
| * Return the PID associated with the global logging process. |
| */ |
| PID<Logging> logging(); |
| |
| |
| /** |
| * Returns the number of worker threads the library has created. A |
| * worker thread is a thread that runs a process (i.e., calls |
| * `ProcessBase::serve`). |
| */ |
| long workers(); |
| |
| |
| /** |
| * Spawn a new process. |
| * |
| * @param process Process to be spawned. |
| * @param manage Whether process should get deleted by the runtime |
| * after terminating. |
| */ |
| UPID spawn(ProcessBase* process, bool manage = false); |
| |
| inline UPID spawn(ProcessBase& process, bool manage = false) |
| { |
| return spawn(&process, manage); |
| } |
| |
| template <typename T> |
| PID<T> spawn(T* t, bool manage = false) |
| { |
| // We save the pid before spawn is called because it's possible that |
| // the process has already been deleted after spawn returns (e.g., |
| // if 'manage' is true). |
| PID<T> pid(t); |
| |
| if (!spawn(static_cast<ProcessBase*>(t), manage)) { |
| return PID<T>(); |
| } |
| |
| return pid; |
| } |
| |
| template <typename T> |
| PID<T> spawn(T& t, bool manage = false) |
| { |
| return spawn(&t, manage); |
| } |
| |
| |
| /** |
| * Sends a `TerminateEvent` to the given process. |
| * |
| * **NOTE**: currently, terminate only works for local processes (in the |
| * future we plan to make this more explicit via the use of a `PID` |
| * instead of a `UPID`). |
| * |
| * @param pid The process to terminate. |
| * @param inject Whether the message should be injected ahead of all other |
| * messages queued up for that process. |
| * |
| * @see process::TerminateEvent |
| */ |
| void terminate(const UPID& pid, bool inject = true); |
| void terminate(const ProcessBase& process, bool inject = true); |
| void terminate(const ProcessBase* process, bool inject = true); |
| |
| |
| /** |
| * Wait for the process to exit for no more than the specified seconds. |
| * |
| * @param PID ID of the process. |
| * @param secs Max time to wait, 0 implies wait forever. |
| * |
| * @return true if a process was actually waited upon. |
| */ |
| bool wait(const UPID& pid, const Duration& duration = Seconds(-1)); |
| bool wait(const ProcessBase& process, const Duration& duration = Seconds(-1)); |
| bool wait(const ProcessBase* process, const Duration& duration = Seconds(-1)); |
| |
| |
| /** |
| * Sends a message with data without a return address. |
| * |
| * @param to Receiver of the message. |
| * @param name Name of the message. |
| * @param data Data to send (gets copied). |
| * @param length Length of data. |
| */ |
| void post(const UPID& to, |
| const std::string& name, |
| const char* data = nullptr, |
| size_t length = 0); |
| |
| |
| void post(const UPID& from, |
| const UPID& to, |
| const std::string& name, |
| const char* data = nullptr, |
| size_t length = 0); |
| |
| |
| /** |
| * @copydoc process::terminate |
| */ |
| inline void terminate(const ProcessBase& process, bool inject) |
| { |
| terminate(process.self(), inject); |
| } |
| |
| |
| /** |
| * @copydoc process::terminate |
| */ |
| inline void terminate(const ProcessBase* process, bool inject) |
| { |
| terminate(process->self(), inject); |
| } |
| |
| |
| /** |
| * @copydoc process::wait |
| */ |
| inline bool wait(const ProcessBase& process, const Duration& duration) |
| { |
| return process::wait(process.self(), duration); // Explicit to disambiguate. |
| } |
| |
| |
| /** |
| * @copydoc process::wait |
| */ |
| inline bool wait(const ProcessBase* process, const Duration& duration) |
| { |
| return process::wait(process->self(), duration); // Explicit to disambiguate. |
| } |
| |
| |
| // Per thread process pointer. |
| extern thread_local ProcessBase* __process__; |
| |
| // NOTE: Methods in this namespace should only be used in tests to |
| // inject arbitrary events. |
| namespace inject { |
| |
| /** |
| * Simulates disconnection of the link between 'from' and 'to' by |
| * sending an `ExitedEvent` to 'to'. |
| * |
| * @see process::ExitedEvent |
| */ |
| bool exited(const UPID& from, const UPID& to); |
| |
| } // namespace inject { |
| |
| } // namespace process { |
| |
| #endif // __PROCESS_PROCESS_HPP__ |