blob: 35190cf691029331b942c1d87d158f64f81f9036 [file] [log] [blame]
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#include <process/id.hpp>
#include <process/defer.hpp>
#include "encoder.hpp"
#include "http_proxy.hpp"
#include "socket_manager.hpp"
using process::http::InternalServerError;
using process::http::NotFound;
using process::network::inet::Socket;
using process::http::Response;
using process::http::Request;
using std::string;
using std::stringstream;
namespace process {
HttpProxy::HttpProxy(const Socket& _socket)
: ProcessBase(ID::generate("__http__")),
socket(_socket) {}
void HttpProxy::finalize()
{
// Need to make sure response producers know not to continue to
// create a response (streaming or otherwise).
if (pipe.isSome()) {
http::Pipe::Reader reader = pipe.get();
reader.close();
}
pipe = None();
while (!items.empty()) {
Item* item = items.front();
// Attempt to discard the future.
item->future.discard();
// But it might have already been ready. In general, we need to
// wait until this future is potentially ready in order to attempt
// to close a pipe if one exists.
item->future.onReady([](const Response& response) {
// Cleaning up a response (i.e., closing any open Pipes in the
// event Response::type is PIPE).
if (response.type == Response::PIPE) {
CHECK_SOME(response.reader);
http::Pipe::Reader reader = response.reader.get(); // Remove const.
reader.close();
}
});
items.pop();
delete item;
}
// Just in case this process gets killed outside of `SocketManager::close`,
// remove the proxy from the socket.
socket_manager->unproxy(socket);
}
void HttpProxy::enqueue(const Response& response, const Request& request)
{
handle(Future<Response>(response), request);
}
void HttpProxy::handle(const Future<Response>& future, const Request& request)
{
items.push(new Item(request, future));
if (items.size() == 1) {
next();
}
}
void HttpProxy::next()
{
if (items.size() > 0) {
// Wait for any transition of the future.
items.front()->future.onAny(
defer(self(), &HttpProxy::waited, lambda::_1));
}
}
void HttpProxy::waited(const Future<Response>& future)
{
CHECK(items.size() > 0);
Item* item = items.front();
CHECK(future == item->future);
// Process the item and determine if we're done or not (so we know
// whether to start waiting on the next responses).
bool processed = process(item->future, item->request);
items.pop();
delete item;
if (processed) {
next();
}
}
bool HttpProxy::process(const Future<Response>& future, const Request& request)
{
if (!future.isReady()) {
// TODO(benh): Consider handling other "states" of future
// (discarded, failed, etc) with different HTTP statuses.
Response response = future.isFailed()
? InternalServerError(future.failure())
: InternalServerError("discarded future");
VLOG(1) << "Returning '" << response.status << "'"
<< " for '" << request.url.path << "'"
<< " ("
<< (future.isFailed()
? future.failure()
: "discarded") << ")";
socket_manager->send(response, request, socket);
return true; // All done, can process next response.
}
Response response = future.get();
// If the response specifies a path, try and perform a sendfile.
if (response.type == Response::PATH) {
// Make sure no body is sent (this is really an error and
// should be reported and no response sent.
response.body.clear();
const string& path = response.path;
Try<int_fd> fd = os::open(path, O_RDONLY);
if (fd.isError()) {
#ifdef __WINDOWS__
const int error = ::GetLastError();
if (error == ERROR_FILE_NOT_FOUND || error == ERROR_PATH_NOT_FOUND) {
#else
const int error = errno;
if (error == ENOENT || error == ENOTDIR) {
#endif // __WINDOWS__
VLOG(1) << "Returning '404 Not Found' for path '" << path << "'";
socket_manager->send(NotFound(), request, socket);
} else {
VLOG(1) << "Failed to send file at '" << path << "': " << fd.error();
socket_manager->send(InternalServerError(), request, socket);
}
} else {
const Try<Bytes> size = os::stat::size(fd.get());
if (size.isError()) {
VLOG(1) << "Failed to send file at '" << path << "': " << size.error();
socket_manager->send(InternalServerError(), request, socket);
} else if (os::stat::isdir(fd.get())) {
VLOG(1) << "Returning '404 Not Found' for directory '" << path << "'";
socket_manager->send(NotFound(), request, socket);
} else {
// While the user is expected to properly set a 'Content-Type'
// header, we fill in (or overwrite) 'Content-Length' header.
response.headers["Content-Length"] = stringify(size->bytes());
if (size.get() == 0) {
socket_manager->send(response, request, socket);
return true; // All done, can process next request.
}
VLOG(1) << "Sending file at '" << path << "' with length "
<< size.get();
// TODO(benh): Consider a way to have the socket manager turn
// on TCP_CORK for both sends and then turn it off.
socket_manager->send(
new HttpResponseEncoder(response, request),
true,
socket);
// Note the file descriptor gets closed by FileEncoder.
socket_manager->send(
new FileEncoder(fd.get(), size->bytes()),
request.keepAlive,
socket);
}
}
} else if (response.type == Response::PIPE) {
// Make sure no body is sent (this is really an error and
// should be reported and no response sent.
response.body.clear();
// While the user is expected to properly set a 'Content-Type'
// header, we fill in (or overwrite) 'Transfer-Encoding' header.
response.headers["Transfer-Encoding"] = "chunked";
VLOG(3) << "Starting \"chunked\" streaming";
socket_manager->send(
new HttpResponseEncoder(response, request),
true,
socket);
CHECK_SOME(response.reader);
http::Pipe::Reader reader = response.reader.get();
pipe = reader;
// Avoid copying the request for each chunk read on the pipe.
//
// TODO(bmahler): Make request a process::Owned or
// process::Shared from the point where it is decoded.
Owned<Request> request_(new Request(request));
reader.read()
.onAny(defer(self(), &Self::stream, request_, lambda::_1));
return false; // Streaming, don't process next response (yet)!
} else {
socket_manager->send(response, request, socket);
}
return true; // All done, can process next response.
}
void HttpProxy::stream(
const Owned<Request>& request,
const Future<string>& chunk)
{
CHECK_SOME(pipe);
CHECK_NOTNULL(request.get());
http::Pipe::Reader reader = pipe.get();
bool finished = false; // Whether we're done streaming.
if (chunk.isReady()) {
std::ostringstream out;
if (chunk->empty()) {
// Finished reading.
out << "0\r\n" << "\r\n";
finished = true;
} else {
out << std::hex << chunk->size() << "\r\n";
out << chunk.get();
out << "\r\n";
// Keep reading.
reader.read()
.onAny(defer(self(), &Self::stream, request, lambda::_1));
}
// Always persist the connection when streaming is not finished.
socket_manager->send(
new DataEncoder(out.str()),
finished ? request->keepAlive : true,
socket);
} else if (chunk.isFailed()) {
VLOG(1) << "Failed to read from stream: " << chunk.failure();
// TODO(bmahler): Have to close connection if headers were sent!
socket_manager->send(InternalServerError(), *request, socket);
finished = true;
} else {
VLOG(1) << "Failed to read from stream: discarded";
// TODO(bmahler): Have to close connection if headers were sent!
socket_manager->send(InternalServerError(), *request, socket);
finished = true;
}
if (finished) {
reader.close();
pipe = None();
next();
}
}
} // namespace process {