blob: 4ef00d11de4ff4e7eb4e44aac204147799f85ac2 [file] [log] [blame]
#include <arpa/inet.h>
#include <stdint.h>
#include <cstring>
#include <deque>
#include <iostream>
#include <string>
#include <process/future.hpp>
#include <process/http.hpp>
#include <process/io.hpp>
#include <stout/lambda.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/try.hpp>
#include "decoder.hpp"
using std::deque;
using std::string;
using process::http::Request;
using process::http::Response;
namespace process {
namespace http {
hashmap<uint16_t, string> statuses;
namespace internal {
Future<Response> decode(const string& buffer)
{
ResponseDecoder decoder;
deque<Response*> responses = decoder.decode(buffer.c_str(), buffer.length());
if (decoder.failed() || responses.empty()) {
for (size_t i = 0; i < responses.size(); ++i) {
delete responses[i];
}
return Failure("Failed to decode HTTP response:\n" + buffer + "\n");
} else if (responses.size() > 1) {
PLOG(ERROR) << "Received more than 1 HTTP Response";
}
Response response = *responses[0];
for (size_t i = 0; i < responses.size(); ++i) {
delete responses[i];
}
return response;
}
Future<Response> request(
const UPID& upid,
const string& method,
const Option<string>& path,
const Option<string>& query,
const Option<hashmap<string, string> >& _headers,
const Option<string>& body,
const Option<string>& contentType)
{
Try<int> socket = process::socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
if (socket.isError()) {
return Failure("Failed to create socket: " + socket.error());
}
int s = socket.get();
Try<Nothing> cloexec = os::cloexec(s);
if (!cloexec.isSome()) {
os::close(s);
return Failure("Failed to cloexec: " + cloexec.error());
}
// We use inet_ntop since inet_ntoa is not thread-safe!
char ip[INET_ADDRSTRLEN];
if (inet_ntop(AF_INET, (in_addr*) &upid.ip, ip, INET_ADDRSTRLEN) == NULL) {
return Failure(ErrnoError("Failed to get human-readable IP address for '" +
stringify(upid.ip) + "'"));
}
const string host = string(ip) + ":" + stringify(upid.port);
sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(upid.port);
addr.sin_addr.s_addr = upid.ip;
if (connect(s, (sockaddr*) &addr, sizeof(addr)) < 0) {
os::close(s);
return Failure(ErrnoError("Failed to connect to '" + host + "'"));
}
std::ostringstream out;
out << method << " /" << upid.id;
if (path.isSome()) {
out << "/" << path.get();
}
if (query.isSome()) {
out << "?" << query.get();
}
out << " HTTP/1.1\r\n";
// Set up the headers as necessary.
hashmap<string, string> headers;
if (_headers.isSome()) {
headers = _headers.get();
}
// Need to specify the 'Host' header.
headers["Host"] = host;
// Tell the server to close the connection when it's done.
headers["Connection"] = "close";
// Overwrite Content-Type if necessary.
if (contentType.isSome()) {
headers["Content-Type"] = contentType.get();
}
// Make sure the Content-Length is set correctly if necessary.
if (body.isSome()) {
headers["Content-Length"] = stringify(body.get().length());
}
// Emit the headers.
foreachpair (const string& key, const string& value, headers) {
out << key << ": " << value << "\r\n";
}
out << "\r\n";
if (body.isSome()) {
out << body.get();
}
Try<Nothing> nonblock = os::nonblock(s);
if (!nonblock.isSome()) {
os::close(s);
return Failure("Failed to set nonblock: " + nonblock.error());
}
// Need to disambiguate the io::read we want when binding below.
Future<string> (*read)(int) = io::read;
return io::write(s, out.str())
.then(lambda::bind(read, s))
.then(lambda::bind(&internal::decode, lambda::_1))
.onAny(lambda::bind(&os::close, s));
}
} // namespace internal {
Future<Response> get(
const UPID& upid,
const Option<string>& path,
const Option<string>& query,
const Option<hashmap<string, string> >& headers)
{
return internal::request(
upid, "GET", path, query, headers, None(), None());
}
Future<Response> post(
const UPID& upid,
const Option<string>& path,
const Option<hashmap<string, string> >& headers,
const Option<string>& body,
const Option<string>& contentType)
{
if (body.isNone() && contentType.isSome()) {
return Failure("Attempted to do a POST with a Content-Type but no body");
}
return internal::request(
upid, "POST", path, None(), headers, body, contentType);
}
} // namespace http {
} // namespace process {