blob: 674483aa80bc74b654343c97892a96f49d5c7ed4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef __WINDOWS__
#include <dlfcn.h>
#endif // __WINDOWS__
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifndef __WINDOWS__
#include <unistd.h>
#endif // __WINDOWS__
#ifndef __WINDOWS__
#include <arpa/inet.h>
#endif // __WINDOWS__
#include <iostream>
#include <memory>
#include <queue>
#include <string>
#include <sstream>
#include <tuple>
#include <mesos/v1/mesos.hpp>
#include <mesos/v1/scheduler.hpp>
#include <mesos/master/detector.hpp>
#include <mesos/module/http_authenticatee.hpp>
#include <process/async.hpp>
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/http.hpp>
#include <process/id.hpp>
#include <process/mutex.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
#include <process/metrics/pull_gauge.hpp>
#include <process/metrics/metrics.hpp>
#include <process/ssl/flags.hpp>
#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/error.hpp>
#include <stout/flags.hpp>
#include <stout/hashmap.hpp>
#include <stout/ip.hpp>
#include <stout/lambda.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/recordio.hpp>
#include <stout/unreachable.hpp>
#include <stout/uuid.hpp>
#include "authentication/http/basic_authenticatee.hpp"
#include "common/http.hpp"
#include "common/recordio.hpp"
#include "internal/devolve.hpp"
#include "internal/evolve.hpp"
#include "local/local.hpp"
#include "logging/logging.hpp"
#include "master/validation.hpp"
#include "messages/messages.hpp"
#include "module/manager.hpp"
#include "scheduler/flags.hpp"
using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::master;
using namespace process;
using std::get;
using std::ostream;
using std::queue;
using std::shared_ptr;
using std::string;
using std::tuple;
using std::vector;
using mesos::internal::recordio::Reader;
using mesos::master::detector::MasterDetector;
using process::collect;
using process::Failure;
using process::Owned;
using process::wait; // Necessary on some OS's to disambiguate.
using process::http::Connection;
using process::http::Pipe;
using process::http::post;
using process::http::URL;
using ::recordio::Decoder;
namespace mesos {
namespace v1 {
namespace scheduler {
struct Connections
{
bool operator==(const Connections& that) const
{
return subscribe == that.subscribe && nonSubscribe == that.nonSubscribe;
}
Connection subscribe; // Used for subscribe call/response.
Connection nonSubscribe; // Used for all other calls/responses.
};
// The process (below) is responsible for sending/receiving HTTP messages
// to/from the master.
class MesosProcess : public ProtobufProcess<MesosProcess>
{
public:
MesosProcess(
const string& master,
ContentType _contentType,
const lambda::function<void()>& connected,
const lambda::function<void()>& disconnected,
const lambda::function<void(const queue<Event>&)>& received,
const Option<Credential>& _credential,
const Option<shared_ptr<MasterDetector>>& _detector,
const Flags& _flags)
: ProcessBase(ID::generate("scheduler")),
state(DISCONNECTED),
metrics(*this),
contentType(_contentType),
callbacks {connected, disconnected, received},
credential(_credential),
local(false),
flags(_flags)
{
GOOGLE_PROTOBUF_VERIFY_VERSION;
// Initialize libprocess (done here since at some point we might
// want to use flags to initialize libprocess).
process::initialize();
if (self().address.ip.isLoopback()) {
LOG(WARNING) << "\n**************************************************\n"
<< "Scheduler driver bound to loopback interface!"
<< " Cannot communicate with remote master(s)."
<< " You might want to set 'LIBPROCESS_IP' environment"
<< " variable to use a routable IP address.\n"
<< "**************************************************";
}
// Initialize logging.
if (flags.initialize_driver_logging) {
logging::initialize("mesos", false, flags);
} else {
VLOG(1) << "Disabling initialization of GLOG logging";
}
LOG(INFO) << "Version: " << MESOS_VERSION;
// Launch a local cluster if necessary.
Option<UPID> pid = None();
if (master == "local") {
pid = local::launch(flags);
local = true;
}
if (_detector.isNone()) {
Try<MasterDetector*> create =
MasterDetector::create(pid.isSome() ? string(pid.get()) : master);
if (create.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to create a master detector: " << create.error();
}
detector.reset(create.get());
} else {
detector = _detector.get();
}
}
~MesosProcess() override
{
disconnect();
// Check and see if we need to shutdown a local cluster.
if (local) {
local::shutdown();
}
// Note that we ignore any callbacks that are enqueued.
}
void send(const Call& call)
{
Option<Error> error = validation::scheduler::call::validate(devolve(call));
if (error.isSome()) {
drop(call, error->message);
return;
}
// TODO(vinod): Add support for sending MESSAGE calls directly
// to the slave, instead of relaying it through the master, as
// the scheduler driver does.
process::http::Request request;
request.method = "POST";
request.url = master.get();
request.body = serialize(contentType, call);
request.keepAlive = true;
request.headers = {{"Accept", stringify(contentType)},
{"Content-Type", stringify(contentType)}};
VLOG(1) << "Adding authentication headers to " << call.type() << " call to "
<< master.get();
// TODO(tillt): Add support for multi-step authentication protocols.
authenticatee->authenticate(request, credential)
.onAny(defer(self(), &Self::_send, call, lambda::_1));
}
Future<APIResult> call(const Call& callMessage)
{
Option<Error> error =
validation::scheduler::call::validate(devolve(callMessage));
if (error.isSome()) {
return Failure(error->message);
}
if (callMessage.type() == Call::SUBSCRIBE) {
return Failure("This method doesn't support SUBSCRIBE calls");
}
if (state != SUBSCRIBED) {
return Failure(
"Cannot perform calls until subscribed. Current state: " +
stringify(state));
}
VLOG(1) << "Sending " << callMessage.type() << " call to " << master.get();
// TODO(vinod): Add support for sending MESSAGE calls directly
// to the slave, instead of relaying it through the master, as
// the scheduler driver does.
process::http::Request request;
request.method = "POST";
request.url = master.get();
request.body = serialize(contentType, callMessage);
request.keepAlive = true;
request.headers = {{"Accept", stringify(contentType)},
{"Content-Type", stringify(contentType)}};
// TODO(tillt): Add support for multi-step authentication protocols.
return authenticatee->authenticate(request, credential)
.recover([](const Future<process::http::Request>& future) {
return Failure(
stringify("HTTP authenticatee ") +
(future.isFailed() ? "failed: " + future.failure() : "discarded"));
})
.then(defer(self(), &Self::_call, callMessage, lambda::_1));
}
void reconnect()
{
// Ignore the reconnection request if we are currently disconnected
// from the master.
if (state == DISCONNECTED) {
VLOG(1) << "Ignoring reconnect request from scheduler since we are"
<< " disconnected";
return;
}
CHECK_SOME(connectionId);
disconnected(connectionId.get(),
"Received reconnect request from scheduler");
}
protected:
void initialize() override
{
// Initialize modules.
if (flags.modules.isSome() && flags.modulesDir.isSome()) {
EXIT(EXIT_FAILURE) << "Only one of MESOS_MODULES or MESOS_MODULES_DIR "
<< "should be specified";
}
if (flags.modulesDir.isSome()) {
Try<Nothing> result =
modules::ModuleManager::load(flags.modulesDir.get());
if (result.isError()) {
EXIT(EXIT_FAILURE) << "Error loading modules: " << result.error();
}
}
if (flags.modules.isSome()) {
Try<Nothing> result = modules::ModuleManager::load(flags.modules.get());
if (result.isError()) {
EXIT(EXIT_FAILURE) << "Error loading modules: " << result.error();
}
}
// Initialize authenticatee.
if (flags.httpAuthenticatee == DEFAULT_BASIC_HTTP_AUTHENTICATEE) {
LOG(INFO) << "Using default '" << DEFAULT_BASIC_HTTP_AUTHENTICATEE
<< "' HTTP authenticatee";
authenticatee = Owned<mesos::http::authentication::Authenticatee>(
new mesos::http::authentication::BasicAuthenticatee);
} else {
LOG(INFO) << "Using '" << flags.httpAuthenticatee
<< "' HTTP authenticatee";
Try<mesos::http::authentication::Authenticatee*> createdAuthenticatee =
modules::ModuleManager::create<
mesos::http::authentication::Authenticatee>(
flags.httpAuthenticatee);
if (createdAuthenticatee.isError()) {
EXIT(EXIT_FAILURE) << "Failed to load HTTP authenticatee: "
<< createdAuthenticatee.error();
}
authenticatee = Owned<mesos::http::authentication::Authenticatee>(
createdAuthenticatee.get());
}
// Start detecting masters.
detection = detector->detect()
.onAny(defer(self(), &MesosProcess::detected, lambda::_1));
}
void connect(const id::UUID& _connectionId)
{
// It is possible that a new master was detected while we were waiting
// to establish a connection with the old master.
if (connectionId != _connectionId) {
VLOG(1) << "Ignoring connection attempt from stale connection";
return;
}
CHECK_EQ(DISCONNECTED, state);
CHECK_SOME(master);
state = CONNECTING;
auto connector = [this]() -> Future<Connection> {
return process::http::connect(master.get());
};
// We create two persistent connections here, one for subscribe
// call/streaming response and another for non-subscribe calls/responses.
collect(connector(), connector())
.onAny(defer(self(), &Self::connected, connectionId.get(), lambda::_1));
}
void connected(
const id::UUID& _connectionId,
const Future<tuple<Connection, Connection>>& _connections)
{
// It is possible that a new master was detected while we had an ongoing
// (re-)connection attempt with the old master.
if (connectionId != _connectionId) {
VLOG(1) << "Ignoring connection attempt from stale connection";
return;
}
CHECK_EQ(CONNECTING, state);
CHECK_SOME(connectionId);
if (!_connections.isReady()) {
disconnected(connectionId.get(),
_connections.isFailed()
? _connections.failure()
: "Connection future discarded");
return;
}
VLOG(1) << "Connected with the master at " << master.get();
state = CONNECTED;
connections =
Connections {get<0>(_connections.get()), get<1>(_connections.get())};
connections->subscribe.disconnected()
.onAny(defer(self(),
&Self::disconnected,
connectionId.get(),
"Subscribe connection interrupted"));
connections->nonSubscribe.disconnected()
.onAny(defer(self(),
&Self::disconnected,
connectionId.get(),
"Non-subscribe connection interrupted"));
// Invoke the connected callback once we have established both subscribe
// and non-subscribe connections with the master.
mutex.lock()
.then(defer(self(), [this]() {
return async(callbacks.connected);
}))
.onAny(lambda::bind(&Mutex::unlock, mutex));
}
void disconnected(
const id::UUID& _connectionId,
const string& failure)
{
// Ignore if the disconnection happened from an old stale connection.
if (connectionId != _connectionId) {
VLOG(1) << "Ignoring disconnection attempt from stale connection";
return;
}
// We can reach here if we noticed a disconnection for either of
// subscribe/non-subscribe connections. We discard the future here to
// trigger a master re-detection.
detection.discard();
}
void disconnect()
{
if (connections.isSome()) {
connections->subscribe.disconnect();
connections->nonSubscribe.disconnect();
}
if (subscribed.isSome()) {
subscribed->reader.close();
}
state = DISCONNECTED;
connections = None();
connectionId = None();
subscribed = None();
}
void detected(const Future<Option<mesos::MasterInfo>>& future)
{
if (future.isFailed()) {
error("Failed to detect a master: " + future.failure());
return;
}
if (state == CONNECTED || state == SUBSCRIBING || state == SUBSCRIBED) {
// Invoke the disconnected callback if we were previously connected.
mutex.lock()
.then(defer(self(), [this]() {
return async(callbacks.disconnected);
}))
.onAny(lambda::bind(&Mutex::unlock, mutex));
}
// Disconnect any active connections.
disconnect();
Option<mesos::MasterInfo> latest;
if (future.isDiscarded()) {
LOG(INFO) << "Re-detecting master";
master = None();
latest = None();
} else if (future->isNone()) {
LOG(INFO) << "Lost leading master";
master = None();
latest = None();
} else {
const UPID& upid = future.get()->pid();
latest = future.get();
string scheme = "http";
#ifdef USE_SSL_SOCKET
if (process::network::openssl::flags().enabled) {
scheme = "https";
}
#endif // USE_SSL_SOCKET
master = ::URL(
scheme,
upid.address.ip,
upid.address.port,
upid.id +
"/api/v1/scheduler");
LOG(INFO) << "New master detected at " << upid;
connectionId = id::UUID::random();
// Wait for a random duration between 0 and `flags.connectionDelayMax`
// before (re-)connecting with the master.
Duration delay = flags.connectionDelayMax * ((double) os::random()
/ RAND_MAX);
VLOG(1) << "Waiting for " << delay << " before initiating a "
<< "re-(connection) attempt with the master";
process::delay(delay, self(), &MesosProcess::connect, connectionId.get());
}
// Keep detecting masters.
detection = detector->detect(latest)
.onAny(defer(self(), &MesosProcess::detected, lambda::_1));
}
Future<Nothing> _receive()
{
Future<Nothing> future = async(callbacks.received, events);
events = queue<Event>();
return future;
}
// Helper for injecting an ERROR event.
void error(const string& message)
{
Event event;
event.set_type(Event::ERROR);
Event::Error* error = event.mutable_error();
error->set_message(message);
receive(event, true);
}
void drop(const Call& call, const string& message)
{
LOG(WARNING) << "Dropping " << call.type() << ": " << message;
}
void _send(const Call& call, const Future<process::http::Request>& future)
{
if (!future.isReady()) {
LOG(ERROR) << "HTTP authenticatee failed while adding authentication"
<< " header to request: " << future;
return;
}
if (call.type() == Call::SUBSCRIBE && state != CONNECTED) {
// It might be possible that the scheduler is retrying. We drop the
// request if we have an ongoing subscribe request in flight or if the
// scheduler is already subscribed.
drop(call, "Scheduler is in state " + stringify(state));
return;
}
if (call.type() != Call::SUBSCRIBE && state != SUBSCRIBED) {
// We drop all non-subscribe calls if we are not currently subscribed.
drop(call, "Scheduler is in state " + stringify(state));
return;
}
process::http::Request request = future.get();
if (connections.isNone()) {
drop(call, "Connection to master interrupted");
return;
}
VLOG(1) << "Sending " << call.type() << " call to " << master.get();
Future<process::http::Response> response;
if (call.type() == Call::SUBSCRIBE) {
state = SUBSCRIBING;
// Send a streaming request for Subscribe call.
response = connections->subscribe.send(request, true);
} else {
CHECK_SOME(streamId);
// Set the stream ID associated with this connection.
request.headers["Mesos-Stream-Id"] = streamId->toString();
response = connections->nonSubscribe.send(request);
}
CHECK_SOME(connectionId);
response.onAny(defer(self(),
&Self::__send,
connectionId.get(),
call,
lambda::_1));
}
void __send(
const id::UUID& _connectionId,
const Call& call,
const Future<process::http::Response>& response)
{
// It is possible that we detected a new master before a response could
// be received.
if (connectionId != _connectionId) {
return;
}
CHECK(!response.isDiscarded());
CHECK(state == SUBSCRIBING || state == SUBSCRIBED) << state;
// This can happen during a master failover or a network blip
// causing the socket to timeout. Eventually, the scheduler would
// detect the disconnection via ZK(disconnect()) or lack of heartbeats.
if (response.isFailed()) {
LOG(ERROR) << "Request for call type " << call.type() << " failed: "
<< response.failure();
return;
}
if (response->code == process::http::Status::OK) {
// Only SUBSCRIBE call should get a "200 OK" response.
CHECK_EQ(Call::SUBSCRIBE, call.type());
CHECK_EQ(response->type, process::http::Response::PIPE);
CHECK_SOME(response->reader);
state = SUBSCRIBED;
Pipe::Reader reader = response->reader.get();
auto deserializer =
lambda::bind(deserialize<Event>, contentType, lambda::_1);
Owned<Reader<Event>> decoder(
new Reader<Event>(Decoder<Event>(deserializer), reader));
subscribed = SubscribedResponse {reader, decoder};
// Responses to SUBSCRIBE calls should always include a stream ID.
CHECK(response->headers.contains("Mesos-Stream-Id"));
Try<id::UUID> uuid =
id::UUID::fromString(response->headers.at("Mesos-Stream-Id"));
CHECK_SOME(uuid);
streamId = uuid.get();
read();
return;
}
if (response->code == process::http::Status::ACCEPTED) {
// Only non SUBSCRIBE calls should get a "202 Accepted" response.
CHECK_NE(Call::SUBSCRIBE, call.type());
return;
}
// We reset the state to connected if the subscribe call did not
// succceed (e.g., the master was still recovering). The scheduler can
// then retry the subscribe call.
if (call.type() == Call::SUBSCRIBE) {
state = CONNECTED;
}
if (response->code == process::http::Status::SERVICE_UNAVAILABLE) {
// This could happen if the master hasn't realized it is the leader yet
// or is still in the process of recovery.
LOG(WARNING) << "Received '" << response->status << "' ("
<< response->body << ") for " << call.type();
return;
}
if (response->code == process::http::Status::NOT_FOUND) {
// This could happen if the master libprocess process has not yet set up
// HTTP routes.
LOG(WARNING) << "Received '" << response->status << "' ("
<< response->body << ") for " << call.type();
return;
}
if (response->code == process::http::Status::TEMPORARY_REDIRECT) {
// This could happen if the detector detects a new leading master before
// master itself realizes it (e.g., ZK watch delay).
LOG(WARNING) << "Received '" << response->status << "' ("
<< response->body << ") for " << call.type();
return;
}
// We should be able to get here only for AuthN errors which is not
// yet supported for HTTP frameworks.
error("Received unexpected '" + response->status + "' (" +
response->body + ") for " + stringify(call.type()));
}
Future<APIResult> _call(
const Call& callMessage,
process::http::Request request)
{
if (connections.isNone()) {
return Failure("Connection to master interrupted");
}
Future<process::http::Response> response;
CHECK_SOME(streamId);
// Set the stream ID associated with this connection.
request.headers["Mesos-Stream-Id"] = streamId->toString();
CHECK_SOME(connectionId);
return connections->nonSubscribe.send(request)
.then(defer(self(),
&Self::__call,
callMessage,
lambda::_1));
}
Future<APIResult> __call(
const Call& callMessage,
const process::http::Response& response)
{
APIResult result;
result.set_status_code(response.code);
if (response.code == process::http::Status::ACCEPTED) {
// "202 Accepted" responses are asynchronously processed, so the body
// should be empty.
if (!response.body.empty()) {
LOG(WARNING) << "Response for " << callMessage.type()
<< " unexpectedly included body: '" << response.body
<< "'";
}
} else if (response.code == process::http::Status::OK) {
if (!response.body.empty()) {
Try<Response> deserializedResponse =
deserialize<Response>(contentType, response.body);
if (deserializedResponse.isError()) {
return Failure(
"Failed to deserialize the response '" + response.status + "'" +
" (" + response.body + "): " + deserializedResponse.error());
}
*result.mutable_response() = deserializedResponse.get();
}
} else {
result.set_error(
"Received unexpected '" + response.status + "'" + " (" +
response.body + ")");
}
return result;
}
void read()
{
subscribed->decoder->read()
.onAny(defer(self(),
&Self::_read,
subscribed->reader,
lambda::_1));
}
void _read(const Pipe::Reader& reader, const Future<Result<Event>>& event)
{
CHECK(!event.isDiscarded());
// Ignore enqueued events from the previous Subscribe call reader.
if (!subscribed.isSome() || subscribed->reader != reader) {
VLOG(1) << "Ignoring event from old stale connection";
return;
}
CHECK_EQ(SUBSCRIBED, state);
CHECK_SOME(connectionId);
// This could happen if the master failed over while sending a event.
if (event.isFailed()) {
LOG(ERROR) << "Failed to decode the stream of events: "
<< event.failure();
disconnected(connectionId.get(), event.failure());
return;
}
// This could happen if the master failed over after sending an event.
if (event->isNone()) {
const string error = "End-Of-File received from master. The master "
"closed the event stream";
LOG(ERROR) << error;
disconnected(connectionId.get(), error);
return;
}
if (event->isError()) {
error("Failed to de-serialize event: " + event->error());
} else {
receive(event->get(), false);
}
read();
}
void receive(const Event& event, bool isLocallyInjected)
{
// Check if we're are no longer subscribed but received an event.
if (!isLocallyInjected && state != SUBSCRIBED) {
LOG(WARNING) << "Ignoring " << stringify(event.type())
<< " event because we're no longer subscribed";
return;
}
if (isLocallyInjected) {
VLOG(1) << "Enqueuing locally injected event " << stringify(event.type());
} else {
VLOG(1) << "Enqueuing event " << stringify(event.type()) << " received"
<< " from " << master.get();
}
// Queue up the event and invoke the 'received' callback if this
// is the first event (between now and when the 'received'
// callback actually gets invoked more events might get queued).
events.push(event);
if (events.size() == 1) {
mutex.lock()
.then(defer(self(), &Self::_receive))
.onAny(lambda::bind(&Mutex::unlock, mutex));
}
}
private:
struct Callbacks
{
lambda::function<void(void)> connected;
lambda::function<void(void)> disconnected;
lambda::function<void(const queue<Event>&)> received;
};
struct SubscribedResponse
{
Pipe::Reader reader;
process::Owned<Reader<Event>> decoder;
};
enum State
{
DISCONNECTED, // Either of subscribe/non-subscribe connection is broken.
CONNECTING, // Trying to establish subscribe and non-subscribe connections.
CONNECTED, // Established subscribe and non-subscribe connections.
SUBSCRIBING, // Trying to subscribe with the master.
SUBSCRIBED // Subscribed with the master.
} state;
friend ostream& operator<<(ostream& stream, State state)
{
switch (state) {
case DISCONNECTED: return stream << "DISCONNECTED";
case CONNECTING: return stream << "CONNECTING";
case CONNECTED: return stream << "CONNECTED";
case SUBSCRIBING: return stream << "SUBSCRIBING";
case SUBSCRIBED: return stream << "SUBSCRIBED";
}
UNREACHABLE();
}
struct Metrics
{
Metrics(const MesosProcess& mesosProcess)
: event_queue_messages(
"scheduler/event_queue_messages",
defer(mesosProcess, &MesosProcess::_event_queue_messages)),
event_queue_dispatches(
"scheduler/event_queue_dispatches",
defer(mesosProcess,
&MesosProcess::_event_queue_dispatches))
{
// TODO(dhamon): When we start checking the return value of 'add' we may
// get failures in situations where multiple SchedulerProcesses are active
// (ie, the fault tolerance tests). At that point we'll need MESOS-1285 to
// be fixed and to use self().id in the metric name.
process::metrics::add(event_queue_messages);
process::metrics::add(event_queue_dispatches);
}
~Metrics()
{
process::metrics::remove(event_queue_messages);
process::metrics::remove(event_queue_dispatches);
}
// Process metrics.
process::metrics::PullGauge event_queue_messages;
process::metrics::PullGauge event_queue_dispatches;
} metrics;
double _event_queue_messages()
{
return static_cast<double>(eventCount<MessageEvent>());
}
double _event_queue_dispatches()
{
return static_cast<double>(eventCount<DispatchEvent>());
}
// There can be multiple simulataneous ongoing (re-)connection attempts with
// the master (e.g., the master failed over while an attempt was in progress).
// This helps us in uniquely identifying the current connection instance and
// ignoring the stale instance.
Option<id::UUID> connectionId; // UUID to identify the connection instance.
Option<Connections> connections;
Option<SubscribedResponse> subscribed;
ContentType contentType;
Callbacks callbacks;
const Option<Credential> credential;
Mutex mutex; // Used to serialize the callback invocations.
bool local; // Whether or not we launched a local cluster.
shared_ptr<MasterDetector> detector;
queue<Event> events;
Option<::URL> master;
Option<id::UUID> streamId;
const Flags flags;
Owned<mesos::http::authentication::Authenticatee> authenticatee;
// Master detection future.
process::Future<Option<mesos::MasterInfo>> detection;
};
Mesos::Mesos(
const string& master,
ContentType contentType,
const lambda::function<void()>& connected,
const lambda::function<void()>& disconnected,
const lambda::function<void(const queue<Event>&)>& received,
const Option<Credential>& credential,
const Option<shared_ptr<MasterDetector>>& detector)
{
Flags flags;
Try<flags::Warnings> load = flags.load("MESOS_");
if (load.isError()) {
EXIT(EXIT_FAILURE) << "Failed to load flags: " << load.error();
}
// Log any flag warnings (after logging is initialized).
foreach (const flags::Warning& warning, load->warnings) {
LOG(WARNING) << warning.message;
}
process = new MesosProcess(
master,
contentType,
connected,
disconnected,
received,
credential,
detector,
flags);
spawn(process);
}
Mesos::Mesos(
const string& master,
ContentType contentType,
const lambda::function<void()>& connected,
const lambda::function<void()>& disconnected,
const lambda::function<void(const queue<Event>&)>& received,
const Option<Credential>& credential)
: Mesos(master,
contentType,
connected,
disconnected,
received,
credential,
None()) {}
Mesos::~Mesos()
{
stop();
}
void Mesos::send(const Call& call)
{
dispatch(process, &MesosProcess::send, call);
}
Future<APIResult> Mesos::call(const Call& callMessage)
{
return dispatch(process, &MesosProcess::call, callMessage);
}
void Mesos::reconnect()
{
dispatch(process, &MesosProcess::reconnect);
}
void Mesos::stop()
{
if (process != nullptr) {
// We pass 'false' here to add the termination event at the end of the
// `MesosProcess` queue. This is to ensure all pending dispatches are
// processed. However multistage events, e.g., `Call`, might still be
// dropped, because a continuation (stage) of such event can be dispatched
// after the termination event, see MESOS-9274.
terminate(process, false);
wait(process);
delete process;
process = nullptr;
}
}
} // namespace scheduler {
} // namespace v1 {
} // namespace mesos {