blob: b8ff3deae0159142b7a2acbf0bfccbdc8ad2a891 [file] [log] [blame]
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#ifndef __WINDOWS__
#include <arpa/inet.h>
#endif // __WINDOWS__
#include <stdint.h>
#include <stdlib.h>
#include <algorithm>
#include <cctype>
#include <cstring>
#include <deque>
#include <iomanip>
#include <ostream>
#include <map>
#include <memory>
#include <queue>
#include <string>
#include <sstream>
#include <tuple>
#include <vector>
#include <process/after.hpp>
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/http.hpp>
#include <process/id.hpp>
#include <process/io.hpp>
#include <process/loop.hpp>
#include <process/owned.hpp>
#include <process/process.hpp>
#include <process/queue.hpp>
#include <process/socket.hpp>
#include <process/state_machine.hpp>
#include <process/ssl/tls_config.hpp>
#include <stout/error.hpp>
#include <stout/foreach.hpp>
#include <stout/ip.hpp>
#include <stout/lambda.hpp>
#include <stout/net.hpp>
#include <stout/nothing.hpp>
#include <stout/numify.hpp>
#include <stout/option.hpp>
#include <stout/strings.hpp>
#include <stout/synchronized.hpp>
#include <stout/try.hpp>
#include <stout/unreachable.hpp>
#include "decoder.hpp"
#include "encoder.hpp"
using std::deque;
using std::istringstream;
using std::list;
using std::map;
using std::ostream;
using std::ostringstream;
using std::queue;
using std::string;
using std::tuple;
using std::vector;
using process::http::Request;
using process::http::Response;
namespace inet4 = process::network::inet4;
using process::network::inet::Address;
using process::network::inet::Socket;
using process::network::internal::SocketImpl;
namespace process {
namespace http {
struct StatusDescription {
uint16_t code;
const char* description;
};
// Status code reason strings, from the HTTP1.1 RFC:
// http://www.w3.org/Protocols/rfc2616/rfc2616-sec6.html
StatusDescription statuses[] = {
{100, "100 Continue"},
{101, "101 Switching Protocols"},
{200, "200 OK"},
{201, "201 Created"},
{202, "202 Accepted"},
{203, "203 Non-Authoritative Information"},
{204, "204 No Content"},
{205, "205 Reset Content"},
{206, "206 Partial Content"},
{300, "300 Multiple Choices"},
{301, "301 Moved Permanently"},
{302, "302 Found"},
{303, "303 See Other"},
{304, "304 Not Modified"},
{305, "305 Use Proxy"},
{307, "307 Temporary Redirect"},
{400, "400 Bad Request"},
{401, "401 Unauthorized"},
{402, "402 Payment Required"},
{403, "403 Forbidden"},
{404, "404 Not Found"},
{405, "405 Method Not Allowed"},
{406, "406 Not Acceptable"},
{407, "407 Proxy Authentication Required"},
{408, "408 Request Time-out"},
{409, "409 Conflict"},
{410, "410 Gone"},
{411, "411 Length Required"},
{412, "412 Precondition Failed"},
{413, "413 Request Entity Too Large"},
{414, "414 Request-URI Too Large"},
{415, "415 Unsupported Media Type"},
{416, "416 Requested range not satisfiable"},
{417, "417 Expectation Failed"},
{500, "500 Internal Server Error"},
{501, "501 Not Implemented"},
{502, "502 Bad Gateway"},
{503, "503 Service Unavailable"},
{504, "504 Gateway Time-out"},
{505, "505 HTTP Version not supported"}
};
const uint16_t Status::CONTINUE = 100;
const uint16_t Status::SWITCHING_PROTOCOLS = 101;
const uint16_t Status::OK = 200;
const uint16_t Status::CREATED = 201;
const uint16_t Status::ACCEPTED = 202;
const uint16_t Status::NON_AUTHORITATIVE_INFORMATION = 203;
const uint16_t Status::NO_CONTENT = 204;
const uint16_t Status::RESET_CONTENT = 205;
const uint16_t Status::PARTIAL_CONTENT = 206;
const uint16_t Status::MULTIPLE_CHOICES = 300;
const uint16_t Status::MOVED_PERMANENTLY = 301;
const uint16_t Status::FOUND = 302;
const uint16_t Status::SEE_OTHER = 303;
const uint16_t Status::NOT_MODIFIED = 304;
const uint16_t Status::USE_PROXY = 305;
const uint16_t Status::TEMPORARY_REDIRECT = 307;
const uint16_t Status::BAD_REQUEST = 400;
const uint16_t Status::UNAUTHORIZED = 401;
const uint16_t Status::PAYMENT_REQUIRED = 402;
const uint16_t Status::FORBIDDEN = 403;
const uint16_t Status::NOT_FOUND = 404;
const uint16_t Status::METHOD_NOT_ALLOWED = 405;
const uint16_t Status::NOT_ACCEPTABLE = 406;
const uint16_t Status::PROXY_AUTHENTICATION_REQUIRED = 407;
const uint16_t Status::REQUEST_TIMEOUT = 408;
const uint16_t Status::CONFLICT = 409;
const uint16_t Status::GONE = 410;
const uint16_t Status::LENGTH_REQUIRED = 411;
const uint16_t Status::PRECONDITION_FAILED = 412;
const uint16_t Status::REQUEST_ENTITY_TOO_LARGE = 413;
const uint16_t Status::REQUEST_URI_TOO_LARGE = 414;
const uint16_t Status::UNSUPPORTED_MEDIA_TYPE = 415;
const uint16_t Status::REQUESTED_RANGE_NOT_SATISFIABLE = 416;
const uint16_t Status::EXPECTATION_FAILED = 417;
const uint16_t Status::INTERNAL_SERVER_ERROR = 500;
const uint16_t Status::NOT_IMPLEMENTED = 501;
const uint16_t Status::BAD_GATEWAY = 502;
const uint16_t Status::SERVICE_UNAVAILABLE = 503;
const uint16_t Status::GATEWAY_TIMEOUT = 504;
const uint16_t Status::HTTP_VERSION_NOT_SUPPORTED = 505;
// Since the status codes are stored in increasing order, we could also
// use std::lower_bound to do the lookup with logarithmic complexity.
// However, according to some cursory research, on most CPUs this will
// be slower until the array size is around 100 elemnts.
//
// [1]: https://schani.wordpress.com/2010/04/30/linear-vs-binary-search/
// [2]: https://stackoverflow.com/questions/1275665/at-which-n-does-binary-search-become-faster-than-linear-search-on-a-modern-cpu
string Status::string(uint16_t code)
{
auto value = std::find_if(
std::begin(statuses),
std::end(statuses),
[code](const StatusDescription& sd) { return sd.code == code; });
if (value != std::end(statuses)) {
return value->description;
}
// Fallback for unknown status codes.
return stringify(code);
}
bool isValidStatus(uint16_t code)
{
return std::end(statuses) != std::find_if(
std::begin(statuses),
std::end(statuses),
[code](const StatusDescription& sd) { return sd.code == code; });
}
// Returns the default port for a given URL scheme.
static Option<uint16_t> defaultPort(const string& scheme)
{
// TODO(tnachen): Make default port a lookup table.
if (scheme == "http") {
return 80;
} else if (scheme == "https") {
return 443;
}
return None();
}
Try<URL> URL::parse(const string& urlString)
{
// TODO(tnachen): Consider using C++11 regex support instead.
size_t schemePos = urlString.find("://");
if (schemePos == string::npos) {
return Error("Missing scheme in url string");
}
const string scheme = strings::lower(urlString.substr(0, schemePos));
const string urlPath = urlString.substr(schemePos + 3);
size_t pathPos = urlPath.find_first_of('/');
if (pathPos == 0) {
return Error("Host not found in url");
}
// If path is specified in the URL, try to capture the host and path
// separately.
string host = urlPath;
string path = "/";
if (pathPos != string::npos) {
host = host.substr(0, pathPos);
path = urlPath.substr(pathPos);
}
if (host.empty()) {
return Error("Host not found in url");
}
const vector<string> tokens = strings::tokenize(host, ":");
if (tokens[0].empty()) {
return Error("Host not found in url");
}
if (tokens.size() > 2) {
return Error("Found multiple ports in url");
}
Option<uint16_t> port;
if (tokens.size() == 2) {
Try<uint16_t> numifyPort = numify<uint16_t>(tokens[1]);
if (numifyPort.isError()) {
return Error("Failed to parse port: " + numifyPort.error());
}
port = numifyPort.get();
} else {
// Attempt to resolve the port based on the URL scheme.
port = defaultPort(scheme);
}
if (port.isNone()) {
return Error("Unable to determine port from url");
}
// TODO(tnachen): Support parsing query and fragment.
return URL(scheme, tokens[0], port.get(), path);
}
bool URL::isAbsolute() const
{
return scheme.isSome();
}
bool Request::acceptsEncoding(const string& encoding) const
{
// From RFC 2616:
//
// 1. If the content-coding is one of the content-codings listed in
// the Accept-Encoding field, then it is acceptable, unless it is
// accompanied by a qvalue of 0. (As defined in section 3.9, a
// qvalue of 0 means "not acceptable.")
//
// 2. The special "*" symbol in an Accept-Encoding field matches any
// available content-coding not explicitly listed in the header
// field.
//
// 3. If multiple content-codings are acceptable, then the acceptable
// content-coding with the highest non-zero qvalue is preferred.
//
// 4. The "identity" content-coding is always acceptable, unless
// specifically refused because the Accept-Encoding field includes
// "identity;q=0", or because the field includes "*;q=0" and does
// not explicitly include the "identity" content-coding. If the
// Accept-Encoding field-value is empty, then only the "identity"
// encoding is acceptable.
//
// If no Accept-Encoding field is present in a request, the server
// MAY assume that the client will accept any content coding. In
// this case, if "identity" is one of the available content-codings,
// then the server SHOULD use the "identity" content-coding...
Option<string> accept = headers.get("Accept-Encoding");
if (accept.isNone() || accept->empty()) {
return false;
}
// Remove spaces and tabs for easier parsing.
accept = strings::remove(accept.get(), " ");
accept = strings::remove(accept.get(), "\t");
accept = strings::remove(accept.get(), "\n");
// First we'll look for the encoding specified explicitly, then '*'.
vector<string> candidates;
candidates.push_back(encoding); // Rule 1.
candidates.push_back("*"); // Rule 2.
foreach (const string& candidate, candidates) {
// Is the candidate one of the accepted encodings?
foreach (const string& encoding_, strings::tokenize(accept.get(), ",")) {
vector<string> tokens = strings::tokenize(encoding_, ";");
if (tokens.empty()) {
continue;
}
if (strings::lower(tokens[0]) == strings::lower(candidate)) {
// Is there a 0 q value? Ex: 'gzip;q=0.0'.
const map<string, vector<string>> values =
strings::pairs(encoding_, ";", "=");
// Look for { "q": ["0"] }.
if (values.count("q") == 0 || values.find("q")->second.size() != 1) {
// No q value, or malformed q value.
return true;
}
// Is the q value > 0?
Try<double> value = numify<double>(values.find("q")->second[0]);
return value.isSome() && value.get() > 0;
}
}
}
return false;
}
bool Request::acceptsMediaType(const string& mediaType) const
{
return _acceptsMediaType(headers.get("Accept"), mediaType);
}
bool Request::acceptsMediaType(
const string& name,
const string& mediaType) const
{
return _acceptsMediaType(headers.get(name), mediaType);
}
bool Request::_acceptsMediaType(
Option<string> name,
const string& mediaType) const
{
vector<string> mediaTypes = strings::tokenize(mediaType, "/");
if (mediaTypes.size() != 2) {
return false;
}
// If no header field is present, then it is assumed
// that the client accepts all media types.
if (name.isNone()) {
return true;
}
// Remove spaces and tabs for easier parsing.
name = strings::remove(name.get(), " ");
name = strings::remove(name.get(), "\t");
name = strings::remove(name.get(), "\n");
// First match 'type/subtype', then 'type/*', then '*/*'.
vector<string> candidates;
candidates.push_back(mediaType);
candidates.push_back(mediaTypes[0] + "/*");
candidates.push_back("*/*");
foreach (const string& candidate, candidates) {
foreach (const string& type, strings::tokenize(name.get(), ",")) {
vector<string> tokens = strings::tokenize(type, ";");
if (tokens.empty()) {
continue;
}
// Is the candidate one of the accepted type?
if (strings::lower(tokens[0]) == strings::lower(candidate)) {
// Is there a 0 q value? Ex: 'gzip;q=0.0'.
const map<string, vector<string>> values =
strings::pairs(type, ";", "=");
// Look for { "q": ["0"] }.
if (values.count("q") == 0 || values.find("q")->second.size() != 1) {
// No q value, or malformed q value.
return true;
}
// Is the q value > 0?
Try<double> value = numify<double>(values.find("q")->second[0]);
return value.isSome() && value.get() > 0;
}
}
}
return false;
}
Pipe::Reader Pipe::reader() const
{
return Pipe::Reader(data);
}
Pipe::Writer Pipe::writer() const
{
return Pipe::Writer(data);
}
Future<string> Pipe::Reader::read()
{
synchronized (data->lock) {
if (data->readEnd == Reader::CLOSED) {
return Failure("closed");
} else if (!data->writes.empty()) {
Future<string> future = data->writes.front();
data->writes.pop();
return future;
} else if (data->writeEnd == Writer::CLOSED) {
return ""; // End-of-file.
} else if (data->writeEnd == Writer::FAILED) {
CHECK_SOME(data->failure);
return data->failure.get();
} else {
data->reads.push(Owned<Promise<string>>(new Promise<string>()));
return data->reads.back()->future();
}
}
}
Future<string> Pipe::Reader::readAll()
{
Pipe::Reader reader = *this;
std::shared_ptr<string> buffer(new string());
return loop(
None(),
[=]() mutable {
return reader.read();
},
[=](const string& data) -> ControlFlow<string> {
if (data.empty()) { // EOF.
return Break(std::move(*buffer));
}
buffer->append(data);
return Continue();
});
}
bool Pipe::Reader::close()
{
bool closed = false;
bool notify = false;
queue<Owned<Promise<string>>> reads;
synchronized (data->lock) {
if (data->readEnd == Reader::OPEN) {
// Throw away outstanding data.
while (!data->writes.empty()) {
data->writes.pop();
}
// Extract the pending reads so we can fail them.
std::swap(data->reads, reads);
closed = true;
data->readEnd = Reader::CLOSED;
// Notify if write-end is still open!
notify = data->writeEnd == Writer::OPEN;
}
}
// NOTE: We transition the promises outside the critical section
// to avoid triggering callbacks that try to reacquire the lock.
if (closed) {
while (!reads.empty()) {
reads.front()->fail("closed");
reads.pop();
}
if (notify) {
data->readerClosure.set(Nothing());
} else {
// This future is only satisifed when the reader is closed before
// the write-end of the pipe. In other cases, discard the promise
// in order to clear any associated callbacks.
data->readerClosure.discard();
}
}
return closed;
}
bool Pipe::Writer::write(string s)
{
bool written = false;
Owned<Promise<string>> read;
synchronized (data->lock) {
// Ignore writes if either end of the pipe is closed or failed!
if (data->writeEnd == Writer::OPEN && data->readEnd == Reader::OPEN) {
// Don't bother surfacing empty writes to the readers.
if (!s.empty()) {
if (data->reads.empty()) {
data->writes.push(std::move(s));
} else {
read = data->reads.front();
data->reads.pop();
}
}
written = true;
}
}
// NOTE: We set the promise outside the critical section to avoid
// triggering callbacks that try to reacquire the lock.
if (read.get() != nullptr) {
read->set(std::move(s)); // NOLINT(misc-use-after-move)
}
return written;
}
bool Pipe::Writer::close()
{
bool closed = false;
queue<Owned<Promise<string>>> reads;
synchronized (data->lock) {
if (data->writeEnd == Writer::OPEN) {
// Extract all the pending reads so we can complete them.
std::swap(data->reads, reads);
data->writeEnd = Writer::CLOSED;
closed = true;
}
}
// NOTE: We set the promises outside the critical section to avoid
// triggering callbacks that try to reacquire the lock.
while (!reads.empty()) {
reads.front()->set(string("")); // End-of-file.
reads.pop();
}
return closed;
}
bool Pipe::Writer::fail(const string& message)
{
bool failed = false;
queue<Owned<Promise<string>>> reads;
synchronized (data->lock) {
if (data->writeEnd == Writer::OPEN) {
// Extract all the pending reads so we can fail them.
std::swap(data->reads, reads);
data->writeEnd = Writer::FAILED;
data->failure = Failure(message);
failed = true;
}
}
// NOTE: We set the promises outside the critical section to avoid
// triggering callbacks that try to reacquire the lock.
while (!reads.empty()) {
reads.front()->fail(message);
reads.pop();
}
return failed;
}
Future<Nothing> Pipe::Writer::readerClosed() const
{
return data->readerClosure.future();
}
namespace header {
Try<WWWAuthenticate> WWWAuthenticate::create(const string& input)
{
// Set `maxTokens` as 2 since auth-param quoted string may
// contain space (e.g., "Basic realm="Registry Realm").
vector<string> tokens = strings::tokenize(input, " ", 2);
if (tokens.size() != 2) {
return Error("Unexpected WWW-Authenticate header format: '" + input + "'");
}
// Since the authentication parameters can contain quote values, we
// do not use `strings::split` here since the delimiter may occur in
// a quoted value which should not be split.
hashmap<string, string> authParam;
Option<string> key, value;
bool inQuotes = false;
foreach (char c, tokens[1]) {
// Auth-param values can be a quoted-string or directive values.
// Please see section "3.2.2.4 Directive values and quoted-string":
// https://tools.ietf.org/html/rfc2617.
//
// If we see a quote we know we must already be parsing `value`
// since `key` cannot be a quoted-string.
if (c != '"' && inQuotes) {
if (value.isNone()) {
return Error("Unexpected auth-param format: '" + tokens[1] + "'");
}
value->append({c});
continue;
}
// If we have not yet parsed `key` this character must belong to
// it if it is not a space, and cannot be a `,` delimiter.
if (key.isNone()) {
if (c == ',') {
return Error("Unexpected auth-param format: '" + tokens[1] + "'");
}
if (c == ' ') {
continue;
}
key = string({c});
continue;
}
// If the current character is `=` we must start parsing a new
// `value`. Since we have already handled `=` in quotes above we
// cannot already have started parsing `value`.
if (c == '=') {
if (value.isSome()) {
return Error("Unexpected auth-param format: '" + tokens[1] + "'");
}
value = "";
continue;
}
// If the current character is a quote, drop the
// character and toogle the quote state.
if (c == '"') {
inQuotes = !inQuotes;
continue;
}
// If the current character is a record delimiter and we are not
// in quotes, we should have parsed both a key and a value. Store
// them, drop the delimiter, and restart parsing.
if (c == ',' && !inQuotes) {
if (key.isNone() || value.isNone()) {
return Error("Unexpected auth-param format: '" + tokens[1] + "'");
}
authParam[*key] = *value;
key = None();
value = None();
continue;
}
// If we have not started parsing `value` we are still parsing `key`.
if (value.isNone()) {
key->append({c});
} else {
value->append({c});
}
}
// Record the last parsed `(key, value)` pair.
if (key.isSome()) {
if (value.isNone() || inQuotes) {
return Error("Unexpected auth-param format: '" + tokens[1] + "'");
}
authParam[*key] = *value;
}
// The realm directive (case-insensitive) is required for all
// authentication schemes that issue a challenge.
if (!authParam.contains("realm")) {
return Error(
"Unexpected auth-param '" +
tokens[1] + "': 'realm' is not defined");
}
return WWWAuthenticate(tokens[0], authParam);
}
string WWWAuthenticate::authScheme()
{
return authScheme_;
}
hashmap<string, string> WWWAuthenticate::authParam()
{
return authParam_;
}
} // namespace header {
OK::OK(const JSON::Value& value, const Option<string>& jsonp)
: Response(Status::OK)
{
type = BODY;
if (jsonp.isSome()) {
headers["Content-Type"] = "text/javascript";
string stringified = stringify(value);
body.reserve(jsonp->size() + 1 + stringified.size() + 1);
body += jsonp.get();
body += "(";
body += stringified;
body += ")";
} else {
headers["Content-Type"] = "application/json";
body = stringify(value);
}
headers["Content-Length"] = stringify(body.size());
}
OK::OK(JSON::Proxy&& value, const Option<string>& jsonp)
: Response(Status::OK)
{
type = BODY;
if (jsonp.isSome()) {
headers["Content-Type"] = "text/javascript";
string stringified = std::move(value);
body.reserve(jsonp->size() + 1 + stringified.size() + 1);
body += jsonp.get();
body += "(";
body += stringified;
body += ")";
} else {
headers["Content-Type"] = "application/json";
body = std::move(value);
}
headers["Content-Length"] = stringify(body.size());
}
namespace path {
Try<hashmap<string, string>> parse(const string& pattern, const string& path)
{
// Split the pattern by '/' into keys.
vector<string> keys = strings::tokenize(pattern, "/");
// Split the path by '/' into segments.
vector<string> segments = strings::tokenize(path, "/");
hashmap<string, string> result;
while (!segments.empty()) {
if (keys.empty()) {
return Error(
"Not expecting suffix '" + strings::join("/", segments) + "'");
}
string key = keys.front();
if (strings::startsWith(key, "{") &&
strings::endsWith(key, "}")) {
key = strings::remove(key, "{", strings::PREFIX);
key = strings::remove(key, "}", strings::SUFFIX);
} else if (key != segments.front()) {
return Error("Expecting '" + key + "' not '" + segments.front() + "'");
}
result[key] = segments.front();
keys.erase(keys.begin());
segments.erase(segments.begin());
}
return result;
}
} // namespace path {
string encode(const string& s, const string& additional_chars)
{
ostringstream out;
foreach (unsigned char c, s) {
switch (c) {
// Reserved characters.
case '$':
case '&':
case '+':
case ',':
case '/':
case ':':
case ';':
case '=':
case '?':
case '@':
// Unsafe characters.
case ' ':
case '"':
case '<':
case '>':
case '#':
case '%':
case '{':
case '}':
case '|':
case '\\':
case '^':
case '~':
case '[':
case ']':
case '`':
// NOTE: The cast to unsigned int is needed.
out << '%' << std::setfill('0') << std::setw(2) << std::hex
<< std::uppercase << (unsigned int) c;
break;
default:
// ASCII control characters and non-ASCII characters.
// NOTE: The cast to unsigned int is needed.
if (c < 0x20 ||
c > 0x7F ||
additional_chars.find_first_of(c) != string::npos) {
out << '%' << std::setfill('0') << std::setw(2) << std::hex
<< std::uppercase << (unsigned int) c;
} else {
out << c;
}
break;
}
}
return out.str();
}
Try<string> decode(const string& s)
{
ostringstream out;
for (size_t i = 0; i < s.length(); ++i) {
if (s[i] != '%') {
out << (s[i] == '+' ? ' ' : s[i]);
continue;
}
// We now expect two more characters: "% HEXDIG HEXDIG"
if (i + 2 >= s.length() || !isxdigit(s[i+1]) || !isxdigit(s[i+2])) {
return Error(
"Malformed % escape in '" + s + "': '" + s.substr(i, 3) + "'");
}
// Convert from HEXDIG HEXDIG to char value.
istringstream in(s.substr(i + 1, 2));
unsigned long l;
in >> std::hex >> l;
if (l > UCHAR_MAX) {
ABORT("Unexpected conversion from hex string: " + s.substr(i + 1, 2) +
" to unsigned long: " + stringify(l));
}
out << static_cast<unsigned char>(l);
i += 2;
}
return out.str();
}
Try<vector<Response>> decodeResponses(const string& s)
{
ResponseDecoder decoder;
vector<Response> result;
auto appendResult = [&result](const deque<http::Response*>& responses) {
foreach (Response* response, responses) {
result.push_back(*response);
delete response;
}
};
appendResult(decoder.decode(s.data(), s.length()));
appendResult(decoder.decode("", 0));
if (decoder.failed()) {
return Error("Decoding failed");
}
if (result.empty()) {
return Error("No response decoded");
}
return result;
}
namespace query {
Try<hashmap<string, string>> decode(const string& query)
{
hashmap<string, string> result;
const vector<string> tokens = strings::tokenize(query, ";&");
foreach (const string& token, tokens) {
const vector<string> pairs = strings::split(token, "=", 2);
if (pairs.size() == 0) {
continue;
}
Try<string> key = http::decode(pairs[0]);
if (key.isError()) {
return Error(key.error());
}
if (pairs.size() == 2) {
Try<string> value = http::decode(pairs[1]);
if (value.isError()) {
return Error(value.error());
}
result[key.get()] = value.get();
} else if (pairs.size() == 1) {
result[key.get()] = "";
}
}
return result;
}
string encode(const hashmap<string, string>& query)
{
string output;
foreachpair (const string& key, const string& value, query) {
output += http::encode(key);
if (!value.empty()) {
output += "=" + http::encode(value);
}
output += '&';
}
return strings::remove(output, "&", strings::SUFFIX);
}
} // namespace query {
ostream& operator<<(ostream& stream, const URL& url)
{
// TODO(bevers): The '//' sequence only follows the ':' if
// the next part begins with an "authority". This is required
// for the 'http' scheme, but not guaranteed in general.
if (url.scheme.isSome()) {
stream << url.scheme.get() << "://";
}
if (url.domain.isSome()) {
// Optimize the most common case by not encoding for `http`
// and `https` schemes, since their domain part is already
// guaranteed to only contain unreserved characters.
bool encodeDomain = url.scheme.isNone() ||
(*url.scheme != "http" && *url.scheme != "https");
const std::string& host = encodeDomain
? http::encode(url.domain.get())
: url.domain.get();
stream << host;
} else if (url.ip.isSome()) {
stream << url.ip.get();
}
if (url.port.isSome()) {
stream << ":" << url.port.get();
}
stream << "/" << strings::remove(url.path, "/", strings::PREFIX);
if (!url.query.empty()) {
stream << "?" << query::encode(url.query);
}
if (url.fragment.isSome()) {
stream << "#" << url.fragment.get();
}
return stream;
}
namespace internal {
// Encodes the request by writing into a pipe, the caller can
// read the encoded data from the returned read end of the pipe.
// A pipe is used since the request body can be a pipe and must
// be read asynchronously.
Pipe::Reader encode(const Request& request)
{
// TODO(bmahler): Replace this with a RequestEncoder.
std::ostringstream out;
out << request.method
<< " /" << strings::remove(request.url.path, "/", strings::PREFIX);
if (!request.url.query.empty()) {
// Convert the query to a string that we join via '=' and '&'.
vector<string> query;
foreachpair (const string& key, const string& value, request.url.query) {
query.push_back(http::encode(key) + "=" + http::encode(value));
}
out << "?" << strings::join("&", query);
}
if (request.url.fragment.isSome()) {
out << "#" << request.url.fragment.get();
}
out << " HTTP/1.1\r\n";
// Overwrite headers as necessary.
Headers headers = request.headers;
// Need to specify the 'Host' header.
CHECK(request.url.domain.isSome() || request.url.ip.isSome());
if (request.url.scheme == Some("http+unix")) {
headers["Host"] = "localhost";
} else if (request.url.domain.isSome()) {
headers["Host"] = request.url.domain.get();
} else if (request.url.ip.isSome()) {
headers["Host"] = stringify(request.url.ip.get());
}
// Add port for non-standard ports.
if (request.url.port.isSome() &&
request.url.port != 80 &&
request.url.port != 443) {
headers["Host"] += ":" + stringify(request.url.port.get());
}
if (!request.keepAlive) {
// Tell the server to close the connection when it's done.
headers["Connection"] = "close";
}
if (request.type == Request::PIPE) {
CHECK(!headers.contains("Content-Length"));
// Make sure the Transfer-Encoding header is set correctly
// for PIPE requests.
headers["Transfer-Encoding"] = "chunked";
} else {
CHECK_EQ(Request::BODY, request.type);
// Make sure the Content-Length header is set correctly.
headers["Content-Length"] = stringify(request.body.length());
}
// TODO(bmahler): Use a 'Request' and a 'RequestEncoder' here!
// Currently this does not handle 'gzip' content encoding,
// unless the caller manually compresses the 'body'. For
// streaming requests we must wipe 'gzip' as an acceptable
// encoding as we don't currently have streaming gzip utilities
// to support decoding a streaming gzip response!
// Emit the headers.
foreachpair (const string& key, const string& value, headers) {
out << key << ": " << value << "\r\n";
}
out << "\r\n";
Pipe pipe;
Pipe::Reader reader = pipe.reader();
Pipe::Writer writer = pipe.writer();
// Write the head of the request.
writer.write(out.str());
switch (request.type) {
case Request::BODY:
writer.write(request.body);
writer.close();
break;
case Request::PIPE:
CHECK_SOME(request.reader);
CHECK(request.body.empty());
Pipe::Reader requestReader = request.reader.get();
loop(None(),
[=]() mutable {
return requestReader.read();
},
[=](const string& chunk) mutable -> ControlFlow<Nothing> {
if (chunk.empty()) {
// EOF case.
writer.write("0\r\n\r\n");
writer.close();
return Break();
}
std::ostringstream out;
out << std::hex << chunk.size() << "\r\n";
out << chunk;
out << "\r\n";
writer.write(out.str());
return Continue();
})
.onDiscarded([=]() mutable {
writer.fail("discarded");
})
.onFailed([=](const string& failure) mutable {
writer.fail(failure);
});
break;
}
return reader;
}
// Returns a 'BODY' response once the body of the provided
// 'PIPE' response can be read completely.
Future<Response> convert(const Response& pipeResponse)
{
CHECK_EQ(Response::PIPE, pipeResponse.type);
CHECK_SOME(pipeResponse.reader);
Pipe::Reader reader = pipeResponse.reader.get();
return reader.readAll()
.then([pipeResponse](const string& body) {
Response bodyResponse = pipeResponse;
bodyResponse.type = Response::BODY;
bodyResponse.body = body;
bodyResponse.reader = None(); // Remove the reader.
return bodyResponse;
});
}
class ConnectionProcess : public Process<ConnectionProcess>
{
public:
ConnectionProcess(const network::Socket& _socket)
: ProcessBase(ID::generate("__http_connection__")),
socket(_socket),
sendChain(Nothing()),
close(false) {}
Future<Response> send(const Request& request, bool streamedResponse)
{
if (!disconnection.future().isPending()) {
return Failure("Disconnected");
}
if (close) {
return Failure("Cannot pipeline after 'Connection: close'");
}
if (request.type == Request::PIPE) {
if (request.reader.isNone()) {
return Failure("Request reader must be set for PIPE request");
}
if (!request.body.empty()) {
return Failure("Request body must be empty for PIPE request");
}
Option<string> contentLength = request.headers.get("Content-Length");
if (request.headers.contains("Content-Length")) {
return Failure("'Content-Length' cannot be set for PIPE request");
}
}
if (!request.keepAlive) {
close = true;
}
// We must chain the calls to Socket::send as it
// otherwise interleaves data across calls.
network::Socket socket_ = socket;
sendChain = sendChain
.then([socket_, request]() {
return _send(socket_, encode(request));
});
// If we can't write to the socket, disconnect.
sendChain
.onFailed(defer(self(), [this](const string& failure) {
disconnect(failure);
}));
Promise<Response> promise;
Future<Response> response = promise.future();
pipeline.push(std::make_tuple(streamedResponse, std::move(promise)));
return response;
}
Future<Nothing> disconnect(const Option<string>& message = None())
{
Try<Nothing, SocketError> shutdown = socket.shutdown(
network::Socket::Shutdown::READ_WRITE);
// If a response is still streaming, we send EOF to
// the decoder in order to fail the pipe reader.
if (decoder.writingBody()) {
decoder.decode("", 0);
}
// Fail any remaining pipelined responses.
while (!pipeline.empty()) {
std::get<1>(pipeline.front()).fail(
message.isSome() ? message.get() : "Disconnected");
pipeline.pop();
}
disconnection.set(Nothing());
return shutdown.isSome() ? Future<Nothing>(Nothing())
: Failure(shutdown.error().message);
}
Future<Nothing> disconnected()
{
return disconnection.future();
}
protected:
void initialize() override
{
// Start the read loop on the socket. We read independently
// of the requests being sent in order to detect socket
// closure at any time.
read();
}
void finalize() override
{
disconnect("Connection object was destructed");
}
private:
static Future<Nothing> _send(network::Socket socket, Pipe::Reader reader)
{
return loop(
None(),
[=]() mutable {
return reader.read();
},
[=](const string& data) mutable -> Future<ControlFlow<Nothing>> {
if (data.empty()) {
return Break(); // EOF.
}
return socket.send(data)
.then([]() -> ControlFlow<Nothing> {
return Continue();
});
});
}
void read()
{
socket.recv()
.onAny(defer(self(), &Self::_read, lambda::_1));
}
void _read(const Future<string>& data)
{
deque<Response*> responses;
if (!data.isReady() || data->empty()) {
// Process EOF. Also send EOF to the decoder if a failure
// or discard is encountered.
responses = decoder.decode("", 0);
} else {
// We should only receive data if we're expecting a response
// in the pipeline, or if a response body is still streaming.
if (pipeline.empty() && !decoder.writingBody()) {
disconnect("Received data when none is expected");
return;
}
responses = decoder.decode(data->data(), data->length());
}
// Process any decoded responses.
while (!responses.empty()) {
// We do not expect any responses when the pipeline is empty.
// Note that this may occur when a 'Connection: close' header
// prematurely terminates the pipeline.
if (pipeline.empty()) {
while (!responses.empty()) {
delete responses.front();
responses.pop_front();
}
disconnect("Received response without a request");
return;
}
Response* response = responses.front();
responses.pop_front();
tuple<bool, Promise<Response>> t = std::move(pipeline.front());
pipeline.pop();
bool streamedResponse = std::get<0>(t);
Promise<Response> promise = std::move(std::get<1>(t));
if (streamedResponse) {
promise.set(*response);
} else {
// If the response should not be streamed, we convert
// the PIPE response into a BODY response.
promise.associate(convert(*response));
}
if (response->headers.contains("Connection") &&
response->headers.at("Connection") == "close") {
// This is the last response the server will send!
close = true;
// Fail the remainder of the pipeline.
while (!pipeline.empty()) {
std::get<1>(pipeline.front()).fail(
"Received 'Connection: close' from the server");
pipeline.pop();
}
}
delete response;
}
// We keep reading and feeding data to the decoder until
// EOF or a failure is encountered.
if (!data.isReady()) {
disconnect(data.isFailed() ? data.failure() : "discarded");
return;
} else if (data->empty()) {
disconnect(); // EOF.
return;
} else if (decoder.failed()) {
disconnect("Failed to decode response");
return;
}
// Close the connection if a 'Connection: close' header
// was found and we're done reading the last response.
if (close && pipeline.empty() && !decoder.writingBody()) {
disconnect();
return;
}
read();
}
network::Socket socket;
StreamingResponseDecoder decoder;
Future<Nothing> sendChain;
Promise<Nothing> disconnection;
// For each response in the pipeline, we store a bool for
// whether the caller wants the response to be streamed.
queue<tuple<bool, Promise<Response>>> pipeline;
// Whether the connection should be closed upon the
// completion of the last outstanding response.
bool close;
};
} // namespace internal {
struct Connection::Data
{
// We spawn `ConnectionProcess` as a managed process to guarantee
// that it does not wait on itself (this would cause a deadlock!).
// See MESOS-4658 for details.
//
// TODO(bmahler): This surfaces a general pattern that we
// should enforce: have libprocess manage a Process when
// it cannot be guaranteed that the Process will be waited
// on within a different execution context. More generally,
// we should be passing Process ownership to libprocess to
// ensure all interaction with a Process occurs through a PID.
Data(const network::Socket& s)
: process(spawn(new internal::ConnectionProcess(s), true)) {}
~Data()
{
// Note that we pass 'false' here to avoid injecting the
// termination event at the front of the queue. This is
// to ensure we don't drop any queued request dispatches
// which would leave the caller with a future stuck in
// a pending state.
terminate(process, false);
}
PID<internal::ConnectionProcess> process;
};
Connection::Connection(
const network::Socket& s,
const network::Address& _localAddress,
const network::Address& _peerAddress)
: localAddress(_localAddress), peerAddress(_peerAddress),
data(std::make_shared<Connection::Data>(s)) {}
Future<Response> Connection::send(
const http::Request& request,
bool streamedResponse)
{
return dispatch(
data->process,
&internal::ConnectionProcess::send,
request,
streamedResponse);
}
Future<Nothing> Connection::disconnect()
{
return dispatch(
data->process,
&internal::ConnectionProcess::disconnect,
None());
}
Future<Nothing> Connection::disconnected()
{
return dispatch(
data->process,
&internal::ConnectionProcess::disconnected);
}
Future<Connection> connect(
const network::Address& address,
Scheme scheme,
const Option<string>& peer_hostname)
{
SocketImpl::Kind kind;
switch (scheme) {
case Scheme::HTTP:
case Scheme::HTTP_UNIX:
kind = SocketImpl::Kind::POLL;
break;
#ifdef USE_SSL_SOCKET
case Scheme::HTTPS:
kind = SocketImpl::Kind::SSL;
break;
#endif
}
Try<network::Socket> socket = network::Socket::create(
address.family(),
kind);
if (socket.isError()) {
return Failure("Failed to create socket: " + socket.error());
}
Future<Nothing> connected = [&]() {
switch (scheme) {
case Scheme::HTTP:
case Scheme::HTTP_UNIX:
return socket->connect(address);
#ifdef USE_SSL_SOCKET
case Scheme::HTTPS:
return socket->connect(
address,
network::openssl::create_tls_client_config(peer_hostname));
#endif
}
UNREACHABLE();
}();
return connected
.then([socket, address]() -> Future<Connection> {
Try<network::Address> localAddress = socket->address();
if (localAddress.isError()) {
return Failure("Failed to get socket's local address: " +
localAddress.error());
}
return Connection(socket.get(), localAddress.get(), address);
});
}
Future<Connection> connect(
const network::Address& address,
Scheme scheme)
{
return connect(address, scheme, None());
}
Future<Connection> connect(const URL& url)
{
// Default to 'http' if no scheme was specified.
string scheme = url.scheme.getOrElse("http");
bool inetAddressRequired = scheme == "http" || scheme == "https";
// TODO(bmahler): Move address resolution into the URL class?
Address address = inet4::Address::ANY_ANY();
if (url.ip.isNone() && url.domain.isNone()) {
return Failure("Expected URL.ip or URL.domain to be set");
}
if (url.ip.isSome()) {
address.ip = url.ip.get();
} else if (inetAddressRequired) {
Try<net::IP> ip = net::getIP(url.domain.get(), AF_INET);
if (ip.isError()) {
return Failure("Failed to determine IP of domain '" +
url.domain.get() + "': " + ip.error());
}
address.ip = ip.get();
}
if (url.port.isNone() && inetAddressRequired) {
return Failure("Expecting url.port to be set");
}
if (url.port.isSome()) {
address.port = url.port.get();
}
if (scheme == "http") {
return connect(address, Scheme::HTTP, url.domain);
}
if (scheme == "https") {
#ifdef USE_SSL_SOCKET
return connect(address, Scheme::HTTPS, url.domain);
#else
return Failure("'https' scheme requires SSL enabled");
#endif
}
#ifndef __WINDOWS__
if (scheme == "http+unix") {
if (!url.domain.isSome()) {
return Failure("'http+unix' scheme requires domain (filesystem path)");
}
Try<network::unix::Address> address =
network::unix::Address::create(url.domain.get());
if (address.isError()) {
return Failure(strings::format(
"Could not create address from %s: %s",
url.domain.get(),
address.error()).get());
}
return connect(*address, Scheme::HTTP, url.domain);
}
#endif // __WINDOWS__
return Failure("Unsupported URL scheme");
}
namespace internal {
Future<Nothing> send(network::Socket socket, Encoder* encoder)
{
size_t* size = new size_t(0);
return loop(
None(),
[=]() {
switch (encoder->kind()) {
case Encoder::DATA: {
const char* data = static_cast<DataEncoder*>(encoder)->next(size);
return socket.send(data, *size);
}
case Encoder::FILE: {
off_t offset = 0;
int_fd fd = static_cast<FileEncoder*>(encoder)->next(&offset, size);
return socket.sendfile(fd, offset, *size);
}
}
UNREACHABLE();
},
[=](size_t length) -> ControlFlow<Nothing> {
// Update the encoder with the amount sent.
encoder->backup(*size - length);
// See if there is any more of the message to send.
if (encoder->remaining() != 0) {
return Continue();
}
return Break();
})
.onAny([=]() {
delete size;
});
}
Future<Nothing> send(
network::Socket socket,
const Response& response,
Request* request)
{
CHECK(response.type == Response::BODY ||
response.type == Response::NONE);
Encoder* encoder = new HttpResponseEncoder(response, *request);
return send(socket, encoder)
.onAny([=]() {
delete encoder;
});
}
Future<Nothing> sendfile(
network::Socket socket,
Response response,
Request* request)
{
CHECK(response.type == Response::PATH);
// Make sure no body is sent (this is really an error and
// should be reported and no response sent.
response.body.clear();
Try<int_fd> fd = os::open(response.path, O_CLOEXEC | O_NONBLOCK | O_RDONLY);
if (fd.isError()) {
const string body = "Failed to open '" + response.path + "': " + fd.error();
// TODO(benh): VLOG(1)?
// TODO(benh): Don't send error back as part of InternalServiceError?
// TODO(benh): Copy headers from `response`?
return send(socket, InternalServerError(body), request);
}
const Try<Bytes> size = os::stat::size(fd.get());
if (size.isError()) {
const string body =
"Failed to fstat '" + response.path + "': " + size.error();
// TODO(benh): VLOG(1)?
// TODO(benh): Don't send error back as part of InternalServiceError?
// TODO(benh): Copy headers from `response`?
os::close(fd.get());
return send(socket, InternalServerError(body), request);
} else if (os::stat::isdir(fd.get())) {
const string body = "'" + response.path + "' is a directory";
// TODO(benh): VLOG(1)?
// TODO(benh): Don't send error back as part of InternalServiceError?
// TODO(benh): Copy headers from `response`?
os::close(fd.get());
return send(socket, InternalServerError(body), request);
}
// While the user is expected to properly set a 'Content-Type'
// header, we'll fill in (or overwrite) 'Content-Length' header.
response.headers["Content-Length"] = stringify(size->bytes());
// TODO(benh): If this is a TCP socket consider turning on TCP_CORK
// for both sends and then turning it off.
Encoder* encoder = new HttpResponseEncoder(response, *request);
return send(socket, encoder)
.onAny([=](const Future<Nothing>& future) {
delete encoder;
// Close file descriptor if we aren't doing any more sending.
if (future.isDiscarded() || future.isFailed()) {
os::close(fd.get());
}
})
.then([=]() mutable -> Future<Nothing> {
// NOTE: the file descriptor gets closed by FileEncoder.
Encoder* encoder = new FileEncoder(fd.get(), size->bytes());
return send(socket, encoder)
.onAny([=]() {
delete encoder;
});
});
}
Future<Nothing> stream(
const network::Socket& socket,
http::Pipe::Reader reader)
{
return loop(
None(),
[=]() mutable {
return reader.read();
},
[=](const string& data) mutable {
bool finished = false;
ostringstream out;
if (data.empty()) {
// Finished reading.
out << "0\r\n" << "\r\n";
finished = true;
} else {
out << std::hex << data.size() << "\r\n";
out << data;
out << "\r\n";
}
Encoder* encoder = new DataEncoder(out.str());
return send(socket, encoder)
.onAny([=]() {
delete encoder;
})
.then([=]() mutable -> ControlFlow<Nothing> {
if (!finished) {
return Continue();
}
return Break();
});
});
}
Future<Nothing> stream(
const network::Socket& socket,
Response response,
Request* request)
{
CHECK(response.type == Response::PIPE);
// Make sure no body is sent (this is really an error and
// should be reported and no response sent).
response.body.clear();
if (response.reader.isNone()) {
// This is clearly a programmer error, we don't have a reader from
// which to stream! We return an `InternalServerError` rather than
// failing just as we do in `sendfile` when we get a malformed
// response.
const string body = "Missing data to stream";
// TODO(benh): VLOG(1)?
// TODO(benh): Don't send error back as part of InternalServiceError?
// TODO(benh): Copy headers from `response`?
return send(socket, InternalServerError(body), request);
}
// While the user is expected to properly set a 'Content-Type'
// header, we'll fill in (or overwrite) 'Transfer-Encoding' header.
response.headers["Transfer-Encoding"] = "chunked";
Encoder* encoder = new HttpResponseEncoder(response, *request);
return send(socket, encoder)
.onAny([=]() {
delete encoder;
})
.then([=]() {
return stream(socket, response.reader.get());
})
// Regardless of whether `send` or `stream` completed successfully
// or failed we close the reader so any writers will be notified.
.onAny([=]() mutable {
response.reader->close();
});
}
struct Item
{
Request* request;
Future<Response> response;
};
Future<Nothing> send(
network::Socket socket,
Queue<Option<Item>> pipeline)
{
return loop(
[=]() mutable {
return pipeline.get();
},
[=](const Option<Item>& item) -> Future<ControlFlow<Nothing>> {
if (item.isNone()) {
return Break();
}
Request* request = item->request;
Future<Response> response = item->response;
// We do an `await` here so that we won't wait forever in the
// event that we've discarded the loop but `response` is
// abandoned or taking too long.
return await(response)
.recover([](const Future<Future<Response>>& future)
-> Future<Response> {
if (future.isFailed()) {
return InternalServerError(
"Failed to wait for response: " + future.failure());
}
// Either `response` was abandoned or the loop has been
// discarded so we return `ServiceUnavailable` so we can
// continue the loop or break out if we've been discarded.
return ServiceUnavailable();
})
.then([](const Future<Response>& future) -> Response {
if (future.isFailed()) {
return InternalServerError(future.failure());
} else if (future.isDiscarded()) {
return ServiceUnavailable();
}
return future.get();
})
.then([=](const Response& response) {
// TODO(benh): Should any generated InternalServerError
// responses due to bugs in the Response passed to us cause
// us to return a Failure here rather than keep processing
// more requests/responses?
return [&]() {
switch (response.type) {
case Response::PATH: return sendfile(socket, response, request);
case Response::PIPE: return stream(socket, response, request);
case Response::BODY:
case Response::NONE: return send(socket, response, request);
}
UNREACHABLE();
}()
.then([=]() -> ControlFlow<Nothing> {
// Persist the connection if the request expects it and
// the response doesn't include 'Connection: close'.
bool persist = request->keepAlive;
if (response.headers.contains("Connection")) {
if (response.headers.at("Connection") == "close") {
persist = false;
}
}
if (persist) {
return Continue();
}
return Break();
});
})
.onAny([=]() {
delete request;
});
});
}
Future<Nothing> receive(
network::Socket socket,
std::function<Future<Response>(const Request&)>&& f,
Queue<Option<Item>> pipeline)
{
// Get the peer address to augment any requests we receive.
Try<network::Address> address = socket.peer();
if (address.isError()) {
return Failure("Failed to get peer address: " + address.error());
}
const size_t size = io::BUFFERED_READ_SIZE;
char* data = new char[size];
StreamingRequestDecoder* decoder = new StreamingRequestDecoder();
return loop(
[=]() {
return socket.recv(data, size);
},
[=](size_t length) mutable -> Future<ControlFlow<Nothing>> {
if (length == 0) {
return Break();
}
// Decode as much of the data as possible into HTTP requests.
const deque<Request*> requests = decoder->decode(data, length);
// NOTE: it's possible the decoder has failed but some
// requests might be available, i.e., `requests.empty()` is
// not true, so we wait to return a `Failure` until when there
// are no requests.
if (decoder->failed() && requests.empty()) {
return Failure("Decoder error while receiving");
}
foreach (Request* request, requests) {
request->client = address.get();
// TODO(benh): To support HTTP pipelining we invoke `f`
// regardless of whether the previous response has been
// completed. This can make handling of requests more
// difficult so we could consider supporting disabling HTTP
// pipelining via some sort of "options" initially passed
// in.
pipeline.put(Item{request, f(*request)});
}
return Continue(); // Keep looping!
})
.onAny([=]() {
delete decoder;
delete[] data;
});
}
Future<Nothing> serve(
network::Socket socket,
std::function<Future<Response>(const Request&)>&& f)
{
// HTTP serving is implemented by running two loops, a "receive"
// loop and a "send" loop. The receive loop passes the pipeline of
// request/responses via a `Queue` to the send loop that is
// responsible for sending the response back to the client. A `None`
// passed on the queue signifies that receiving has completed.
//
// TODO(benh): Replace this with something like `Stream` that can
// give us completion semantics without having to encode them with
// an `Option` like we do here.
Queue<Option<Item>> pipeline;
Future<Nothing> receiving =
receive(socket, std::move(f), pipeline)
.onAny([=]() mutable {
// Either:
//
// (1) An EOF was received.
// (2) A failure occurred while receiving.
// (3) Receiving was discarded (likely because serving was
// discarded).
//
// In all cases the best course of action is to signify that
// no more items will be enqueued on the `pipeline` and in
// the case of (2) or (3) shutdown the read end of the
// socket so the client recognizes it can't send any more
// requests.
//
// Note that we don't look at the return value of
// `Socket::shutdown` because the socket might already be
// shutdown!
pipeline.put(None());
socket.shutdown(network::Socket::Shutdown::READ);
});
Future<Nothing> sending =
send(socket, pipeline)
.onAny([=]() mutable {
// Either:
//
// (1) HTTP connection is not meant to be persistent or
// there are no more items expected in the pipeline.
// (2) A failure occurred while sending.
// (3) Sending was discarded (likely because serving was
// discarded).
//
// In all cases the best course of action is to shutdown the
// socket which will also force receiving to complete.
//
// Note that we don't look at the return value of
// `Socket::shutdown` because the socket might already be
// shutdown!
//
// CAREFUL! We can't shutdown with Shutdown::READ_WRITE
// because on OSX if the socket is already shutdown with READ
// due to the call above then the call will fail rather than
// just treat it like a shutdown WRITE.
socket.shutdown(network::Socket::Shutdown::READ);
socket.shutdown(network::Socket::Shutdown::WRITE);
});
std::shared_ptr<Promise<Nothing>> promise(new Promise<Nothing>());
promise->future().onDiscard([=]() mutable {
receiving.discard();
sending.discard();
});
await(sending, receiving)
.onAny([=]() mutable {
// Delete remaining requests and discard remaining responses.
if (pipeline.size() != 0) {
loop(None(),
[=]() mutable {
return pipeline.get();
},
[=](Option<Item> item) -> ControlFlow<Nothing> {
if (item.isNone()) {
return Break();
}
delete item->request;
if (promise->future().hasDiscard()) {
item->response.discard();
}
return Continue();
});
}
if (receiving.isReady() && sending.isReady()) {
promise->set(Nothing());
} else if (receiving.isFailed() && sending.isFailed()) {
promise->fail("Failed to receive (" + receiving.failure() +
") and send (" + sending.failure() + ")");
} else if (receiving.isFailed()) {
promise->fail("Failed to receive: " + receiving.failure());
} else if (sending.isFailed()) {
promise->fail("Failed to send: " + sending.failure());
} else {
CHECK(receiving.isDiscarded() || sending.isDiscarded());
promise->discard();
}
});
return promise->future();
}
} // namespace internal {
class ServerProcess : public Process<ServerProcess>
{
public:
ServerProcess(
network::Socket&& socket,
std::function<Future<Response>(
const network::Socket&,
const Request&)>&& f)
: socket(std::move(socket)),
f(std::move(f)),
state(State::INITIALIZED) {}
// `Server` implementation.
Future<Nothing> run()
{
return state.transition<State::INITIALIZED, State::RUNNING>([=]() {
// Start the accept loop and store the future so we can later
// discard it when we need to stop the server.
accepting = loop(
self(),
[=]() {
return socket.accept();
},
[=](const network::Socket& socket) -> ControlFlow<Nothing> {
// If we've transitioned to STOPPING we should break. It
// may seem like we should never get here because we
// discard the accept loop before we transition to
// STOPPING but it's possible that we've already
// dispatched this lambda and it is only now getting
// invoked. It's critical that we break because after we
// transition to STOPPING we assume that `clients` will
// not be modified by the accept loop.
if (state.is<State::STOPPING>()) {
return Break();
}
Client client = {
/* .socket = */ socket,
/* .serving = */ http::serve(
socket,
[=](const Request& request) {
return f(socket, request);
})
};
clients.put(socket, client);
client.serving
.onAny(defer(self(), [=](const Future<Nothing>&) {
clients.erase(socket);
}));
return Continue();
});
// We return a _discardable_ `accepting` so the caller can stop
// running a server by "discarding the run", for example:
//
// Future<Nothing> run = server.run();
// run.discard();
//
// Even if we returned an _undiscardable_ `accepting` we still
// need to do a `recover()` on it in the event that the accept
// loop fails (or is abandoned) so we can stop the server (if it
// isn't already being stopped). If `accepting` completes
// successfully then we must be stopping so just wait until
// we've stopped!
return accepting
.then(defer(self(), [=]() {
return state.when<State::STOPPED>();
}))
.recover(defer(self(), [=](const Future<Nothing>& future) {
// If the accept loop completes because it is abandoned,
// discarded, or failed we want to stop the server if it
// isn't already being stopped. In fact, someone stopping
// the server might be the reason we're executing this
// callback because `stop()` discards `accepting`.
//
// If we're not already stopping, i.e., the state is
// RUNNING, then it's safe to assume that the accept loop
// has (1) failed or (2) been discarded because someone did
// a discard on the future returned from `run()`. In either
// of these cases we initiate a `stop()` ourselves and after
// that completes return a failure capturing the reason we
// had to initiate the stop.
if (state.is<State::RUNNING>()) {
return stop(Server::DEFAULT_STOP_OPTIONS())
.then([=]() -> Future<Nothing> {
return Failure(stringify(future));
});
}
// Otherwise we must already be stopping so just wait till
// we're stopped.
return state.when<State::STOPPED>();
}));
});
}
Future<Nothing> stop(const Server::StopOptions& options)
{
return state.transition<State::RUNNING, State::STOPPING>([=]() {
// We make stopping be undiscardable to ensure that we properly
// cleanup. This has the added benefit of simplifying having to
// reason about how discards may propagate here when someone
// discards the future returned from `run()` after they've
// already called `stop()`.
return undiscardable([=]() {
accepting.discard();
// In addition to discarding the accept loop we also attempt
// to stop accepting new clients via shutting down the
// socket. Shutting down the socket on Linux will keep further
// connections from being established even though we haven't
// closed the socket but on OS X connections will keep queuing
// (in fact, calling `shutdown()` is an error, hence we don't
// check the return value).
socket.shutdown(network::Socket::Shutdown::READ_WRITE);
// TODO(benh): ideally we also try and shut down the read end
// of existing clients to signal that we won't handle any new
// requests. Note that doing this is expected by the
// specification, see section 8.1.4 in
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html. Something
// like:
//
// foreachvalue (Client& client, clients) {
// client.socket.shutdown(network::Socket::Shutdown::READ);
// }
//
// Doing this is non-trivial, however, because in practice
// what most servers do is continue to read any partial
// requests until some point at which no more requests appear
// to be coming in. This will require significant changes in
// `http::serve()`, hence it's left as a TODO.
// Wait for the current clients to finish (we know no more
// clients will get added because we set `stopping` and the
// accept loop will respect that (see `run()`).
return await(lambda::map(
[](Client&& client) {
return client.serving;
},
clients.values()))
// After the grace period expires discard all the clients
// and then keep waiting.
.after(options.grace_period,
defer(self(), [=](Future<vector<Future<Nothing>>> f) {
f.discard();
return await(lambda::map(
[](Client&& client) {
client.serving.discard();
return client.serving;
},
clients.values()));
}))
.then(defer(self(), [=]() {
clients.clear();
// We `await()` the accept loop because we don't care how
// it completes, we just want it to complete.
return await(accepting)
.then(defer(self(), [=]() -> Future<Nothing> {
return state.transition<State::STOPPING, State::STOPPED>();
}));
}));
}());
},
"Server must be started in order to be stopped");
}
protected:
void finalize() override
{
// If we started the accept loop then discard it and any clients
// we are already serving.
accepting.discard();
// NOTE: we know that no more sockets will be accepted because
// the accept loop is on `self()` and we're in `finalize()`.
foreachvalue (Client& client, clients) {
client.serving.discard();
}
clients.clear();
}
private:
network::Socket socket;
std::function<Future<Response>(const network::Socket&, const Request&)> f;
enum class State
{
INITIALIZED,
RUNNING,
STOPPING,
STOPPED,
};
StateMachine<State> state;
Future<Nothing> accepting;
struct Client
{
network::Socket socket;
Future<Nothing> serving;
};
hashmap<int_fd, Client> clients;
};
Try<Server> Server::create(
network::Socket socket,
std::function<Future<Response>(const network::Socket&, const Request&)>&& f,
const CreateOptions& options)
{
// NOTE: we start listening on the socket here so that a client can
// attempt to connect to the server even before `Server::run` has
// been called. If we postpone calling `Socket::listen` until we
// invoke `Server::run` there is a race between when `Server::run`
// returns and when `Socket::listen` actually gets called because
// `Server::run` dispatches to `ServerProcess::run`. This is likely
// not a problem in practice but is definitely problematic with
// tests that try and start making connections immediately after
// `Server::run` has returned but potentially before
// `Socket::listen` has been invoked.
Try<Nothing> listen = socket.listen(static_cast<int>(options.backlog));
if (listen.isError()) {
return Error("Failed to listen on socket: " + listen.error());
}
return Server(std::move(socket), std::move(f));
}
Try<Server> Server::create(
const network::Address& address,
std::function<Future<Response>(const network::Socket&, const Request&)>&& f,
const Server::CreateOptions& options)
{
SocketImpl::Kind kind = [&]() {
switch (options.scheme) {
case Scheme::HTTP:
case Scheme::HTTP_UNIX:
return SocketImpl::Kind::POLL;
#ifdef USE_SSL_SOCKET
case Scheme::HTTPS:
return SocketImpl::Kind::SSL;
#endif
}
UNREACHABLE();
}();
Try<network::Socket> socket = network::Socket::create(
address.family(),
kind);
if (socket.isError()) {
return Error("Failed to create socket: " + socket.error());
}
Try<network::Address> bind = socket->bind(address);
if (bind.isError()) {
return Error(
"Failed to bind to address '" + stringify(address) + "': "
+ bind.error());
}
return create(socket.get(), std::move(f), options);
}
Server::Server(
network::Socket&& socket,
std::function<Future<Response>(const network::Socket&, const Request&)>&& f)
: socket(socket),
process(new ServerProcess(std::move(socket), std::move(f)))
{
spawn(*process);
}
Server::~Server()
{
// `process` may be a `nullptr` if we've moved `this`.
if (process.get() != nullptr) {
terminate(*process);
wait(*process);
}
}
Future<Nothing> Server::run()
{
return dispatch(*process, &ServerProcess::run);
}
Future<Nothing> Server::stop(const Server::StopOptions& options)
{
return dispatch(*process, &ServerProcess::stop, options);
}
Try<network::Address> Server::address() const
{
return socket.address();
}
Request createRequest(
const URL& url,
const string& method,
const Option<Headers>& headers,
const Option<string>& body,
const Option<string>& contentType)
{
Request request;
request.method = method;
request.url = url;
request.keepAlive = false;
if (headers.isSome()) {
request.headers = headers.get();
}
if (body.isSome()) {
request.body = body.get();
}
if (contentType.isSome()) {
request.headers["Content-Type"] = contentType.get();
}
return request;
}
Request createRequest(
const UPID& upid,
const string& method,
bool enableSSL,
const Option<string>& path,
const Option<Headers>& headers,
const Option<string>& body,
const Option<string>& contentType)
{
const string scheme = enableSSL ? "https" : "http";
URL url(scheme, net::IP(upid.address.ip), upid.address.port, upid.id);
if (path.isSome()) {
url.path = strings::join("/", url.path, path.get());
}
return createRequest(url, method, headers, body, contentType);
}
Future<Response> request(const Request& request, bool streamedResponse)
{
// We rely on the connection closing after the response.
CHECK(!request.keepAlive);
return http::connect(request.url)
.then([=](Connection connection) {
Future<Response> response = connection.send(request, streamedResponse);
// This is a non Keep-Alive request which means the connection
// will be closed when the response is received. Since the
// 'Connection' is reference-counted, we must maintain a copy
// until the disconnection occurs.
connection.disconnected()
.onAny([connection]() {});
return response;
});
}
Future<Response> get(
const URL& url,
const Option<Headers>& headers)
{
Request _request;
_request.method = "GET";
_request.url = url;
_request.keepAlive = false;
if (headers.isSome()) {
_request.headers = headers.get();
}
return request(_request, false);
}
Future<Response> get(
const UPID& upid,
const Option<string>& path,
const Option<string>& query,
const Option<Headers>& headers,
const Option<string>& scheme)
{
URL url(
scheme.getOrElse("http"),
net::IP(upid.address.ip),
upid.address.port,
upid.id);
if (path.isSome()) {
// TODO(benh): Get 'query' and/or 'fragment' out of 'path'.
url.path = strings::join("/", url.path, path.get());
}
if (query.isSome()) {
Try<hashmap<string, string>> decode = http::query::decode(
strings::remove(query.get(), "?", strings::PREFIX));
if (decode.isError()) {
return Failure("Failed to decode HTTP query string: " + decode.error());
}
url.query = decode.get();
}
return get(url, headers);
}
Future<Response> post(
const URL& url,
const Option<Headers>& headers,
const Option<string>& body,
const Option<string>& contentType)
{
if (body.isNone() && contentType.isSome()) {
return Failure("Attempted to do a POST with a Content-Type but no body");
}
Request _request;
_request.method = "POST";
_request.url = url;
_request.keepAlive = false;
if (headers.isSome()) {
_request.headers = headers.get();
}
if (body.isSome()) {
_request.body = body.get();
}
if (contentType.isSome()) {
_request.headers["Content-Type"] = contentType.get();
}
return request(_request, false);
}
Future<Response> post(
const UPID& upid,
const Option<string>& path,
const Option<Headers>& headers,
const Option<string>& body,
const Option<string>& contentType,
const Option<string>& scheme)
{
URL url(
scheme.getOrElse("http"),
net::IP(upid.address.ip),
upid.address.port,
upid.id);
if (path.isSome()) {
// TODO(benh): Get 'query' and/or 'fragment' out of 'path'.
url.path = strings::join("/", url.path, path.get());
}
return post(url, headers, body, contentType);
}
Future<Response> requestDelete(
const URL& url,
const Option<Headers>& headers)
{
Request _request;
_request.method = "DELETE";
_request.url = url;
_request.keepAlive = false;
if (headers.isSome()) {
_request.headers = headers.get();
}
return request(_request, false);
}
Future<Response> requestDelete(
const UPID& upid,
const Option<string>& path,
const Option<Headers>& headers,
const Option<string>& scheme)
{
URL url(
scheme.getOrElse("http"),
net::IP(upid.address.ip),
upid.address.port,
upid.id);
if (path.isSome()) {
// TODO(joerg84): Handle 'query' and/or 'fragment' in 'path'.
// This could mean that we either remove these or even fail
// (see MESOS-3163).
url.path = strings::join("/", url.path, path.get());
}
return requestDelete(url, headers);
}
namespace streaming {
Future<Response> get(
const URL& url,
const Option<Headers>& headers)
{
Request _request;
_request.method = "GET";
_request.url = url;
_request.keepAlive = false;
if (headers.isSome()) {
_request.headers = headers.get();
}
return request(_request, true);
}
Future<Response> get(
const UPID& upid,
const Option<string>& path,
const Option<string>& query,
const Option<Headers>& headers,
const Option<string>& scheme)
{
URL url(
scheme.getOrElse("http"),
net::IP(upid.address.ip),
upid.address.port,
upid.id);
if (path.isSome()) {
// TODO(benh): Get 'query' and/or 'fragment' out of 'path'.
url.path = strings::join("/", url.path, path.get());
}
if (query.isSome()) {
Try<hashmap<string, string>> decode = http::query::decode(
strings::remove(query.get(), "?", strings::PREFIX));
if (decode.isError()) {
return Failure("Failed to decode HTTP query string: " + decode.error());
}
url.query = decode.get();
}
return streaming::get(url, headers);
}
Future<Response> post(
const URL& url,
const Option<Headers>& headers,
const Option<string>& body,
const Option<string>& contentType)
{
if (body.isNone() && contentType.isSome()) {
return Failure("Attempted to do a POST with a Content-Type but no body");
}
Request _request;
_request.method = "POST";
_request.url = url;
_request.keepAlive = false;
if (body.isSome()) {
_request.body = body.get();
}
if (headers.isSome()) {
_request.headers = headers.get();
}
if (contentType.isSome()) {
_request.headers["Content-Type"] = contentType.get();
}
return request(_request, true);
}
Future<Response> post(
const UPID& upid,
const Option<string>& path,
const Option<Headers>& headers,
const Option<string>& body,
const Option<string>& contentType,
const Option<string>& scheme)
{
URL url(
scheme.getOrElse("http"),
net::IP(upid.address.ip),
upid.address.port,
upid.id);
if (path.isSome()) {
// TODO(benh): Get 'query' and/or 'fragment' out of 'path'.
url.path = strings::join("/", url.path, path.get());
}
return streaming::post(url, headers, body, contentType);
}
} // namespace streaming {
} // namespace http {
} // namespace process {