blob: 99a7d0dfff7b0c61decc9ff6d9e6d46ef13a7e75 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <dlfcn.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <iostream>
#include <queue>
#include <string>
#include <sstream>
#include <mesos/v1/mesos.hpp>
#include <mesos/v1/scheduler.hpp>
#include <process/async.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/id.hpp>
#include <process/mutex.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/error.hpp>
#include <stout/flags.hpp>
#include <stout/hashmap.hpp>
#include <stout/ip.hpp>
#include <stout/lambda.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/recordio.hpp>
#include <stout/uuid.hpp>
#include "common/http.hpp"
#include "common/recordio.hpp"
#include "internal/devolve.hpp"
#include "internal/evolve.hpp"
#include "local/local.hpp"
#include "logging/flags.hpp"
#include "logging/logging.hpp"
#include "master/detector.hpp"
#include "master/validation.hpp"
#include "messages/messages.hpp"
using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::master;
using namespace process;
using std::queue;
using std::string;
using std::vector;
using mesos::internal::recordio::Reader;
using process::Owned;
using process::wait; // Necessary on some OS's to disambiguate.
using process::http::Pipe;
using process::http::post;
using process::http::Response;
using ::recordio::Decoder;
namespace mesos {
namespace v1 {
namespace scheduler {
// The process (below) is responsible for receiving messages
// (eventually events) from the master and sending messages (via
// calls) to the master.
class MesosProcess : public ProtobufProcess<MesosProcess>
{
public:
MesosProcess(
const string& master,
ContentType _contentType,
const lambda::function<void()>& _connected,
const lambda::function<void()>& _disconnected,
lambda::function<void(const queue<Event>&)> _received)
: ProcessBase(ID::generate("scheduler")),
contentType(_contentType),
connected(_connected),
disconnected(_disconnected),
received(_received),
local(false),
detector(NULL)
{
GOOGLE_PROTOBUF_VERIFY_VERSION;
// Load any flags from the environment (we use local::Flags in the
// event we run in 'local' mode, since it inherits
// logging::Flags). In the future, just as the TODO in
// local/main.cpp discusses, we'll probably want a way to load
// master::Flags and slave::Flags as well.
local::Flags flags;
Try<Nothing> load = flags.load("MESOS_");
if (load.isError()) {
EXIT(1) << "Failed to load flags: " << load.error();
}
// Initialize libprocess (done here since at some point we might
// want to use flags to initialize libprocess).
process::initialize();
if (self().address.ip.isLoopback()) {
LOG(WARNING) << "\n**************************************************\n"
<< "Scheduler driver bound to loopback interface!"
<< " Cannot communicate with remote master(s)."
<< " You might want to set 'LIBPROCESS_IP' environment"
<< " variable to use a routable IP address.\n"
<< "**************************************************";
}
// Initialize logging.
if (flags.initialize_driver_logging) {
logging::initialize("mesos", flags);
} else {
VLOG(1) << "Disabling initialization of GLOG logging";
}
LOG(INFO) << "Version: " << MESOS_VERSION;
// Launch a local cluster if necessary.
Option<UPID> pid = None();
if (master == "local") {
pid = local::launch(flags);
local = true;
}
Try<MasterDetector*> create =
MasterDetector::create(pid.isSome() ? string(pid.get()) : master);
if (create.isError()) {
EXIT(1) << "Failed to create a master detector: " << create.error();
}
// Save the detector so we can delete it later.
detector = create.get();
}
virtual ~MesosProcess()
{
disconnect();
// Check and see if we need to shutdown a local cluster.
if (local) {
local::shutdown();
}
// Note that we ignore any callbacks that are enqueued.
}
// TODO(benh): Move this to 'protected'.
using ProtobufProcess<MesosProcess>::send;
void send(const Call& call)
{
// NOTE: We enqueue the calls to guarantee that a call is sent only after
// a response has been received for the previous call.
// TODO(vinod): Use HTTP pipelining instead.
calls.push(call);
if (calls.size() > 1) {
return;
}
// If this is the first in the queue send the call.
_send(call)
.onAny(defer(self(), &Self::___send));
}
protected:
virtual void initialize()
{
// Start detecting masters.
detector->detect()
.onAny(defer(self(), &MesosProcess::detected, lambda::_1));
}
void detected(const Future<Option<mesos::MasterInfo>>& future)
{
CHECK(!future.isDiscarded());
if (future.isFailed()) {
error("Failed to detect a master: " + future.failure());
return;
}
// Disconnect the reader upon a master detection callback.
disconnect();
if (future.get().isNone()) {
master = None();
VLOG(1) << "No master detected";
mutex.lock()
.then(defer(self(), &Self::_detected))
.onAny(lambda::bind(&Mutex::unlock, mutex));
} else {
master = UPID(future.get().get().pid());
VLOG(1) << "New master detected at " << master.get();
mutex.lock()
.then(defer(self(), &Self::__detected))
.onAny(lambda::bind(&Mutex::unlock, mutex));
}
// Keep detecting masters.
detector->detect(future.get())
.onAny(defer(self(), &MesosProcess::detected, lambda::_1));
}
Future<Nothing> _detected()
{
return async(disconnected);
}
Future<Nothing> __detected()
{
return async(connected);
}
Future<Nothing> _receive()
{
Future<Nothing> future = async(received, events);
events = queue<Event>();
return future;
}
// Helper for injecting an ERROR event.
void error(const string& message)
{
Event event;
event.set_type(Event::ERROR);
Event::Error* error = event.mutable_error();
error->set_message(message);
receive(event, true);
}
void drop(const Call& call, const string& message)
{
LOG(WARNING) << "Dropping " << call.type() << ": " << message;
}
Future<Nothing> _send(const Call& call)
{
if (master.isNone()) {
drop(call, "Disconnected");
return Nothing();
}
Option<Error> error = validation::scheduler::call::validate(devolve(call));
if (error.isSome()) {
drop(call, error.get().message);
return Nothing();
}
VLOG(1) << "Sending " << call.type() << " call to " << master.get();
// TODO(vinod): Add support for sending MESSAGE calls directly
// to the slave, instead of relaying it through the master, as
// the scheduler driver does.
const string body = serialize(contentType, call);
const http::Headers headers{{"Accept", stringify(contentType)}};
Future<Response> response;
if (call.type() == Call::SUBSCRIBE) {
// Each subscription requires a new connection.
disconnect();
// Send a streaming request for Subscribe call.
response = process::http::streaming::post(
master.get(),
"api/v1/scheduler",
headers,
body,
stringify(contentType));
} else {
response = post(
master.get(),
"api/v1/scheduler",
headers,
body,
stringify(contentType));
}
return response
.onAny(defer(self(), &Self::__send, call, lambda::_1))
.then([]() { return Nothing(); });
}
void __send(const Call& call, const Future<Response>& response)
{
CHECK(!response.isDiscarded());
// This can happen during a master failover or a network blip
// causing the socket to timeout. Eventually, the scheduler would
// detect the disconnection via ZK(disconnect()) or lack of heartbeats.
if (response.isFailed()) {
LOG(ERROR) << "Request for call type " << call.type() << " failed: "
<< response.failure();
return;
}
if (response->code == process::http::Status::OK) {
// Only SUBSCRIBE call should get a "200 OK" response.
CHECK_EQ(Call::SUBSCRIBE, call.type());
CHECK_EQ(response.get().type, http::Response::PIPE);
CHECK_SOME(response.get().reader);
Pipe::Reader reader = response.get().reader.get();
auto deserializer =
lambda::bind(deserialize<Event>, contentType, lambda::_1);
Owned<Reader<Event>> decoder(
new Reader<Event>(Decoder<Event>(deserializer), reader));
connection = Connection {reader, decoder};
read();
return;
}
if (response->code == process::http::Status::ACCEPTED) {
// Only non SUBSCRIBE calls should get a "202 Accepted" response.
CHECK_NE(Call::SUBSCRIBE, call.type());
return;
}
if (response->code == process::http::Status::SERVICE_UNAVAILABLE) {
// This could happen if the master hasn't realized it is the leader yet
// or is still in the process of recovery.
LOG(WARNING) << "Received '" << response.get().status << "' ("
<< response.get().body << ") for " << call.type();
return;
}
// We should be able to get here only for AuthN errors which is not
// yet supported for HTTP frameworks.
error("Received unexpected '" + response.get().status + "' (" +
response.get().body + ") for " + stringify(call.type()));
}
void ___send()
{
CHECK_LT(0u, calls.size());
calls.pop();
// Execute the next event in the queue.
if (!calls.empty()) {
_send(calls.front())
.onAny(defer(self(), &Self::___send));
}
}
void read()
{
connection.get().decoder->read()
.onAny(defer(self(),
&Self::_read,
connection.get().reader,
lambda::_1));
}
void _read(const Pipe::Reader& reader, const Future<Result<Event>>& event)
{
CHECK(!event.isDiscarded());
// Ignore enqueued events from the previous Subscribe call reader.
if (!connection.isSome() || connection.get().reader != reader) {
VLOG(1) << "Ignoring event from old stale connection";
return;
}
// This could happen if the master failed over while sending a response.
// It's fine to drop this as the scheduler would detect the
// disconnection via ZK(disconnect) or lack of heartbeats.
if (event.isFailed()) {
LOG(ERROR) << "Failed to decode the stream of events: "
<< event.failure();
return;
}
if (!event.get().isSome()) {
// It's fine to drop this as the scheduler would detect the
// disconnection via ZK(disconnect) or lack of heartbeats.
LOG(ERROR) << "End-Of-File received from master."
<< " The master closed the event stream";
return;
}
if (event.get().isError()) {
error("Failed to de-serialize event: " + event.get().error());
} else {
receive(event.get().get(), false);
}
read();
}
void receive(const Event& event, bool isLocallyInjected)
{
// Check if we're disconnected but received an event.
if (!isLocallyInjected && master.isNone()) {
LOG(WARNING) << "Ignoring " << stringify(event.type())
<< " event because we're disconnected";
return;
}
if (isLocallyInjected) {
VLOG(1) << "Enqueuing locally injected event " << stringify(event.type());
} else {
VLOG(1) << "Enqueuing event " << stringify(event.type()) << " received"
<< " from " << master.get();
}
// Queue up the event and invoke the 'received' callback if this
// is the first event (between now and when the 'received'
// callback actually gets invoked more events might get queued).
events.push(event);
if (events.size() == 1) {
mutex.lock()
.then(defer(self(), &Self::_receive))
.onAny(lambda::bind(&Mutex::unlock, mutex));
}
}
void disconnect()
{
if (connection.isSome()) {
if (!connection.get().reader.close()) {
LOG(WARNING) << "HTTP connection was already closed";
}
}
connection = None();
}
private:
struct Connection
{
Pipe::Reader reader;
process::Owned<Reader<Event>> decoder;
};
Option<Connection> connection;
ContentType contentType;
Mutex mutex; // Used to serialize the callback invocations.
lambda::function<void()> connected;
lambda::function<void()> disconnected;
lambda::function<void(const queue<Event>&)> received;
bool local; // Whether or not we launched a local cluster.
MasterDetector* detector;
queue<Event> events;
queue<Call> calls;
Option<UPID> master;
};
Mesos::Mesos(
const string& master,
ContentType contentType,
const lambda::function<void()>& connected,
const lambda::function<void()>& disconnected,
const lambda::function<void(const queue<Event>&)>& received)
{
process = new MesosProcess(
master,
contentType,
connected,
disconnected,
received);
spawn(process);
}
Mesos::~Mesos()
{
terminate(process);
wait(process);
delete process;
}
void Mesos::send(const Call& call)
{
dispatch(process, &MesosProcess::send, call);
}
} // namespace scheduler {
} // namespace v1 {
} // namespace mesos {