blob: 05863aa9871d73c34993901017f53eb0f22e7886 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef __RESOURCE_PROVIDER_HTTP_CONNECTION_HPP__
#define __RESOURCE_PROVIDER_HTTP_CONNECTION_HPP__
#include <glog/logging.h>
#include <functional>
#include <ostream>
#include <string>
#include <tuple>
#include <queue>
#include <utility>
#include <mesos/http.hpp>
#include <mesos/v1/mesos.hpp>
#include <process/async.hpp>
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/http.hpp>
#include <process/mutex.hpp>
#include <process/owned.hpp>
#include <process/process.hpp>
#include <stout/duration.hpp>
#include <stout/nothing.hpp>
#include <stout/recordio.hpp>
#include <stout/result.hpp>
#include <stout/unreachable.hpp>
#include <stout/uuid.hpp>
#include "common/http.hpp"
#include "common/recordio.hpp"
#include "resource_provider/detector.hpp"
namespace mesos {
namespace internal {
/**
* HTTP connection handler.
*
* Manages the connection to a Call/Event based v1 API like the
* resource provider API.
*/
template <typename Call, typename Event>
class HttpConnectionProcess
: public process::Process<HttpConnectionProcess<Call, Event>>
{
public:
/**
* Construct a HTTP connection process.
*
* @param prefix prefix of the actor.
* @param _detector the endpoint detector.
* @param _contentType the content type expected by this connection.
* @param validate a callback which will be invoked when a call
* needs to be validated.
* @param connected a callback which will be invoked when the
* connection is established.
* @param disconnected a callback which will be invoked when the
* connection is disconnected.
* @param received a callback which will be be invoked when events
* are received.
*/
HttpConnectionProcess(
const std::string& prefix,
process::Owned<EndpointDetector> _detector,
ContentType _contentType,
const Option<std::string>& _token,
const std::function<Option<Error>(const Call&)>& validate,
const std::function<void(void)>& connected,
const std::function<void(void)>& disconnected,
const std::function<void(const std::queue<Event>&)>& received)
: process::ProcessBase(process::ID::generate(prefix)),
state(State::DISCONNECTED),
contentType(_contentType),
token(_token),
callbacks {validate, connected, disconnected, received},
detector(std::move(_detector)) {}
process::Future<Nothing> send(const Call& call)
{
Option<Error> error = callbacks.validate(call);
if (error.isSome()) {
return process::Failure(error->message);
}
if (endpoint.isNone()) {
return process::Failure("Not connected to an endpoint");
}
if (call.type() == Call::SUBSCRIBE && state != State::CONNECTED) {
// It might be possible that the client is retrying. We drop the
// request if we have an ongoing subscribe request in flight or
// if the client is already subscribed.
return process::Failure(
"Cannot process 'SUBSCRIBE' call as the driver is in "
"state " + stringify(state));
}
if (call.type() != Call::SUBSCRIBE && state != State::SUBSCRIBED) {
// We drop all non-subscribe calls if we are not currently subscribed.
return process::Failure(
"Cannot process '" + stringify(call.type()) + "' call "
"as the driver is in state " + stringify(state));
}
CHECK(state == State::CONNECTED || state == State::SUBSCRIBED);
CHECK_SOME(connections);
VLOG(1) << "Sending " << call.type() << " call to " << endpoint.get();
process::http::Request request;
request.method = "POST";
request.url = endpoint.get();
request.body = serialize(contentType, call);
request.keepAlive = true;
request.headers = {{"Accept", stringify(contentType)},
{"Content-Type", stringify(contentType)}};
if (token.isSome()) {
request.headers["Authorization"] = "Bearer " + token.get();
}
process::Future<process::http::Response> response;
if (call.type() == Call::SUBSCRIBE) {
CHECK_EQ(State::CONNECTED, state);
state = State::SUBSCRIBING;
// Send a streaming request for Subscribe call.
response = connections->subscribe.send(request, true);
} else {
if (streamId.isSome()) {
// Set the stream ID associated with this connection.
request.headers["Mesos-Stream-Id"] = streamId->toString();
}
response = connections->nonSubscribe.send(request);
}
CHECK_SOME(connectionId);
return response.then(
defer(self(),
&Self::_send,
connectionId.get(),
call,
lambda::_1));
}
void start()
{
detection = detector->detect(None())
.onAny(defer(self(), &Self::detected, lambda::_1));
}
protected:
// Because we're deriving from a templated base class, we have
// to explicitly bring these hidden base class names into scope.
using process::Process<HttpConnectionProcess<Call, Event>>::self;
typedef HttpConnectionProcess<Call, Event> Self;
void finalize() override
{
disconnect();
}
void detected(const process::Future<Option<process::http::URL>>& future)
{
if (future.isFailed()) {
LOG(WARNING) << "Failed to detect an endpoint: " << future.failure();
// TODO(nfnt): A non-retryable error might be the reason for the
// failed future. In that case the client should be informed
// about this error and the URL dectection aborted.
}
// Invoke the disconnected callback if we were previously connected.
switch (state) {
case State::CONNECTING:
case State::DISCONNECTED:
break;
case State::CONNECTED:
case State::SUBSCRIBING:
case State::SUBSCRIBED: {
mutex.lock()
.then(defer(self(), [this]() {
return process::async(callbacks.disconnected);
}))
.onAny(lambda::bind(&process::Mutex::unlock, mutex));
}
}
disconnect();
if (future.isDiscarded()) {
LOG(INFO) << "Re-detecting endpoint";
endpoint = None();
} else if (future->isNone()) {
LOG(INFO) << "Lost endpoint";
endpoint = None();
} else {
endpoint = future->get();
LOG(INFO) << "New endpoint detected at " << endpoint.get();
connectionId = id::UUID::random();
dispatch(self(), &Self::connect, connectionId.get());
}
detection = detector->detect(endpoint)
.onAny(defer(self(), &Self::detected, lambda::_1));
}
void connect(const id::UUID& _connectionId)
{
// It is possible that a new endpoint was detected while we were
// waiting to establish a connection with the old master.
if (connectionId != _connectionId) {
VLOG(1) << "Ignoring connection attempt from stale connection";
return;
}
CHECK_SOME(endpoint);
CHECK_EQ(State::DISCONNECTED, state);
state = State::CONNECTING;
// We create two persistent connections here, one for subscribe
// call/streaming response and another for non-subscribe
// calls/responses.
collect(
process::http::connect(endpoint.get()),
process::http::connect(endpoint.get()))
.onAny(defer(self(), &Self::connected, connectionId.get(), lambda::_1));
}
void connected(
const id::UUID& _connectionId,
const process::Future<std::tuple<
process::http::Connection, process::http::Connection>>& _connections)
{
// It is possible that a new endpoint was detected while we had an
// ongoing (re-)connection attempt with the old endpoint.
if (connectionId != _connectionId) {
VLOG(1) << "Ignoring connection attempt from stale connection";
return;
}
CHECK_EQ(State::CONNECTING, state);
if (!_connections.isReady()) {
disconnected(connectionId.get(),
_connections.isFailed()
? _connections.failure()
: "Connection future discarded");
return;
}
VLOG(1) << "Connected with the remote endpoint at " << endpoint.get();
state = State::CONNECTED;
connections = Connections {
std::get<0>(_connections.get()),
std::get<1>(_connections.get())};
connections->subscribe.disconnected()
.onAny(defer(
self(),
&Self::disconnected,
connectionId.get(),
"Subscribe connection interrupted"));
connections->nonSubscribe.disconnected()
.onAny(defer(
self(),
&Self::disconnected,
connectionId.get(),
"Non-subscribe connection interrupted"));
// Invoke the connected callback once we have established both
// subscribe and non-subscribe connections with the master.
mutex.lock()
.then(defer(self(), [this]() {
return process::async(callbacks.connected);
}))
.onAny(lambda::bind(&process::Mutex::unlock, mutex));
}
void disconnect()
{
if (connections.isSome()) {
connections->subscribe.disconnect();
connections->nonSubscribe.disconnect();
}
if (subscribed.isSome()) {
subscribed->reader.close();
}
state = State::DISCONNECTED;
connections = None();
subscribed = None();
endpoint = None();
connectionId = None();
detection.discard();
}
void disconnected(const id::UUID& _connectionId, const std::string& failure)
{
// Ignore if the disconnection happened from an old stale connection.
if (connectionId != _connectionId) {
VLOG(1) << "Ignoring disconnection attempt from stale connection";
return;
}
// We can reach here if we noticed a disconnection for either of
// subscribe/non-subscribe connections. We discard the future here
// to trigger an endpoint re-detection.
detection.discard();
}
process::Future<Nothing> _send(
const id::UUID& _connectionId,
const Call& call,
const process::http::Response& response)
{
// It is possible that we detected a new endpoint before a
// response could be received.
if (connectionId != _connectionId) {
return process::Failure("Ignoring response from stale connection");
}
CHECK(state == State::SUBSCRIBING || state == State::SUBSCRIBED) << state;
if (response.code == process::http::Status::OK) {
// Only SUBSCRIBE call should get a "200 OK" response.
CHECK_EQ(Call::SUBSCRIBE, call.type());
CHECK_EQ(process::http::Response::PIPE, response.type);
CHECK_SOME(response.reader);
state = State::SUBSCRIBED;
process::http::Pipe::Reader reader = response.reader.get();
auto deserializer =
lambda::bind(deserialize<Event>, contentType, lambda::_1);
process::Owned<recordio::Reader<Event>> decoder(
new recordio::Reader<Event>(
::recordio::Decoder<Event>(deserializer),
reader));
subscribed = SubscribedResponse(reader, std::move(decoder));
if (response.headers.contains("Mesos-Stream-Id")) {
Try<id::UUID> uuid =
id::UUID::fromString(response.headers.at("Mesos-Stream-Id"));
CHECK_SOME(uuid);
streamId = uuid.get();
}
read();
return Nothing();
}
if (response.code == process::http::Status::ACCEPTED) {
// Only non SUBSCRIBE calls should get a "202 Accepted" response.
CHECK_NE(Call::SUBSCRIBE, call.type());
return Nothing();
}
// We reset the state to connected if the subscribe call did not
// succceed. We can then retry the subscribe call.
if (call.type() == Call::SUBSCRIBE) {
state = State::CONNECTED;
}
if (response.code == process::http::Status::SERVICE_UNAVAILABLE ||
response.code == process::http::Status::NOT_FOUND) {
return process::Failure(
"Received '" + response.status + "' (" + response.body + ")");
}
return process::Failure(
"Received unexpected '" + response.status +
"' (" + response.body + ")");
}
void read()
{
subscribed->decoder->read()
.onAny(defer(self(),
&Self::_read,
subscribed->reader,
lambda::_1));
}
void _read(
const process::http::Pipe::Reader& reader,
const process::Future<Result<Event>>& event)
{
CHECK(!event.isDiscarded());
// Ignore enqueued events from the previous Subscribe call reader.
if (!subscribed.isSome() || subscribed->reader != reader) {
VLOG(1) << "Ignoring event from old stale connection";
return;
}
CHECK_EQ(State::SUBSCRIBED, state);
CHECK_SOME(connectionId);
if (event.isFailed()) {
LOG(ERROR) << "Failed to decode stream of events: "
<< event.failure();
disconnected(connectionId.get(), event.failure());
return;
}
if (event->isNone()) {
const std::string error = "End-Of-File received";
LOG(ERROR) << error;
disconnected(connectionId.get(), error);
return;
}
if (event->isError()) {
LOG(ERROR) << "Failed to de-serialize event: " << event->error();
} else {
receive(event->get());
}
read();
}
void receive(const Event& event)
{
// Check if we're are no longer subscribed but received an event.
if (state != State::SUBSCRIBED) {
LOG(WARNING) << "Ignoring " << stringify(event.type())
<< " event because we're no longer subscribed";
return;
}
// Queue up the event and invoke the 'received' callback if this
// is the first event (between now and when the 'received'
// callback actually gets invoked more events might get queued).
events.push(event);
if (events.size() == 1) {
mutex.lock()
.then(defer(self(), [this]() {
process::Future<Nothing> future =
process::async(callbacks.received, events);
events = std::queue<Event>();
return future;
}))
.onAny(lambda::bind(&process::Mutex::unlock, mutex));
}
}
private:
struct Callbacks
{
std::function<Option<Error>(const Call&)> validate;
std::function<void(void)> connected;
std::function<void(void)> disconnected;
std::function<void(const std::queue<Event>&)> received;
};
struct Connections
{
process::http::Connection subscribe;
process::http::Connection nonSubscribe;
};
struct SubscribedResponse
{
SubscribedResponse(
process::http::Pipe::Reader _reader,
process::Owned<recordio::Reader<Event>> _decoder)
: reader(std::move(_reader)),
decoder(std::move(_decoder)) {}
// The decoder cannot be copied meaningfully, see MESOS-5122.
SubscribedResponse(const SubscribedResponse&) = delete;
SubscribedResponse& operator=(const SubscribedResponse&) = delete;
SubscribedResponse& operator=(SubscribedResponse&&) = default;
SubscribedResponse(SubscribedResponse&&) = default;
process::http::Pipe::Reader reader;
process::Owned<recordio::Reader<Event>> decoder;
};
enum class State
{
DISCONNECTED, // Either of subscribe/non-subscribe connection is broken.
CONNECTING, // Trying to establish subscribe and non-subscribe connections.
CONNECTED, // Established subscribe and non-subscribe connections.
SUBSCRIBING, // Trying to subscribe with the remote endpoint.
SUBSCRIBED // Subscribed with the remote endpoint.
};
friend std::ostream& operator<<(std::ostream& stream, State state)
{
switch (state) {
case State::DISCONNECTED: return stream << "DISCONNECTED";
case State::CONNECTING: return stream << "CONNECTING";
case State::CONNECTED: return stream << "CONNECTED";
case State::SUBSCRIBING: return stream << "SUBSCRIBING";
case State::SUBSCRIBED: return stream << "SUBSCRIBED";
}
UNREACHABLE();
}
State state;
Option<Connections> connections;
Option<SubscribedResponse> subscribed;
Option<process::http::URL> endpoint;
const mesos::ContentType contentType;
Option<std::string> token;
const Callbacks callbacks;
process::Mutex mutex; // Used to serialize the callback invocations.
process::Owned<EndpointDetector> detector;
std::queue<Event> events;
// There can be multiple simulataneous ongoing (re-)connection
// attempts with the remote endpoint (e.g., the endpoint failed over
// while an attempt was in progress). This helps us in uniquely
// identifying the current connection instance and ignoring the
// stale instance.
Option<id::UUID> connectionId;
Option<id::UUID> streamId;
process::Future<Option<process::http::URL>> detection;
};
} // namespace internal {
} // namespace mesos {
#endif // __RESOURCE_PROVIDER_HTTP_CONNECTION_HPP__