blob: b7ce4de4b01a898bb6763dde070fe521e142ee6c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <dlfcn.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <iostream>
#include <string>
#include <sstream>
#include <mesos/mesos.hpp>
#include <mesos/scheduler.hpp>
#include <process/async.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/id.hpp>
#include <process/mutex.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/error.hpp>
#include <stout/flags.hpp>
#include <stout/lambda.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/uuid.hpp>
#include "sasl/authenticatee.hpp"
#include "common/type_utils.hpp"
#include "master/detector.hpp"
#include "local/local.hpp"
#include "master/detector.hpp"
#include "logging/flags.hpp"
#include "logging/logging.hpp"
#include "messages/messages.hpp"
using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::master;
using namespace process;
using std::queue;
using std::string;
using std::vector;
using process::wait; // Necessary on some OS's to disambiguate.
namespace mesos {
namespace scheduler {
// The process (below) is responsible for receiving messages
// (eventually events) from the master and sending messages (via
// calls) to the master.
class MesosProcess : public ProtobufProcess<MesosProcess>
{
public:
MesosProcess(
const string& master,
const Option<Credential>& _credential,
const lambda::function<void(void)>& _connected,
const lambda::function<void(void)>& _disconnected,
lambda::function<void(const queue<Event>&)> _received)
: ProcessBase(ID::generate("scheduler")),
credential(_credential),
connected(_connected),
disconnected(_disconnected),
received(_received),
local(false),
failover(true),
detector(NULL),
authenticatee(NULL),
authenticating(None()),
authenticated(false),
reauthenticate(false)
{
GOOGLE_PROTOBUF_VERIFY_VERSION;
// Load any flags from the environment (we use local::Flags in the
// event we run in 'local' mode, since it inherits
// logging::Flags). In the future, just as the TODO in
// local/main.cpp discusses, we'll probably want a way to load
// master::Flags and slave::Flags as well.
local::Flags flags;
Try<Nothing> load = flags.load("MESOS_");
if (load.isError()) {
error("Failed to load flags: " + load.error());
return;
}
// Initialize libprocess (done here since at some point we might
// want to use flags to initialize libprocess).
process::initialize();
// Initialize logging.
if (flags.initialize_driver_logging) {
logging::initialize("mesos", flags);
} else {
VLOG(1) << "Disabling initialization of GLOG logging";
}
LOG(INFO) << "Version: " << MESOS_VERSION;
// Launch a local cluster if necessary.
Option<UPID> pid = None();
if (master == "local") {
pid = local::launch(flags);
local = true;
}
Try<MasterDetector*> detector_ =
MasterDetector::create(pid.isSome() ? string(pid.get()) : master);
if (detector_.isError()) {
error("Failed to create a master detector:" + detector_.error());
return;
}
// Save the detector so we can delete it later.
detector = detector_.get();
}
virtual ~MesosProcess()
{
delete authenticatee;
// Check and see if we need to shutdown a local cluster.
if (local) {
local::shutdown();
}
// Wait for any callbacks to finish.
mutex.lock().await();
}
// TODO(benh): Move this to 'protected'.
using ProtobufProcess<MesosProcess>::send;
void send(Call call)
{
if (master.isNone()) {
drop(call, "Disconnected");
return;
}
// If no user was specified in FrameworkInfo, use the current user.
// TODO(benh): Make FrameworkInfo.user be optional and add a
// 'user' to either TaskInfo or CommandInfo.
if (call.framework_info().user() == "") {
Result<string> user = os::user();
CHECK_SOME(user);
call.mutable_framework_info()->set_user(user.get());
}
// Only a REGISTER should not have set the framework ID.
if (call.type() != Call::REGISTER && !call.framework_info().has_id()) {
drop(call, "Call is mising FrameworkInfo.id");
return;
}
if (!call.IsInitialized()) {
drop(call, "Call is not properly initialized: " +
call.InitializationErrorString());
return;
}
switch (call.type()) {
case Call::REGISTER: {
RegisterFrameworkMessage message;
message.mutable_framework()->CopyFrom(call.framework_info());
send(master.get(), message);
break;
}
case Call::REREGISTER: {
ReregisterFrameworkMessage message;
message.mutable_framework()->CopyFrom(call.framework_info());
message.set_failover(failover);
send(master.get(), message);
break;
}
case Call::UNREGISTER: {
UnregisterFrameworkMessage message;
message.mutable_framework_id()->CopyFrom(call.framework_info().id());
send(master.get(), message);
break;
}
case Call::REQUEST: {
if (!call.has_request()) {
drop(call, "Expecting 'request' to be present");
return;
}
ResourceRequestMessage message;
message.mutable_framework_id()->CopyFrom(call.framework_info().id());
message.mutable_requests()->CopyFrom(call.request().requests());
send(master.get(), message);
break;
}
case Call::DECLINE: {
if (!call.has_decline()) {
drop(call, "Expecting 'decline' to be present");
return;
}
LaunchTasksMessage message;
message.mutable_framework_id()->CopyFrom(call.framework_info().id());
message.mutable_filters()->CopyFrom(call.decline().filters());
message.mutable_offer_ids()->CopyFrom(call.decline().offer_ids());
send(master.get(), message);
break;
}
case Call::REVIVE: {
ReviveOffersMessage message;
message.mutable_framework_id()->CopyFrom(call.framework_info().id());
send(master.get(), message);
break;
}
case Call::LAUNCH: {
if (!call.has_launch()) {
drop(call, "Expecting 'launch' to be present");
return;
}
// We do some local validation here, but really this should
// all happen in the master so it's only implemented once.
vector<TaskInfo> tasks;
foreach (const TaskInfo& task, call.launch().task_infos()) {
// Check that each TaskInfo has either an ExecutorInfo or a
// CommandInfo but not both.
if (task.has_executor() == task.has_command()) {
drop(task,
"TaskInfo must have either an 'executor' or a 'command'");
continue;
}
// Ensure ExecutorInfo::framework_id is valid, if present.
if (task.has_executor() &&
task.executor().has_framework_id() &&
!(task.executor().framework_id() == call.framework_info().id())) {
drop(task,
"ExecutorInfo has an invalid FrameworkID (Actual: " +
stringify(task.executor().framework_id()) + " vs Expected: " +
stringify(call.framework_info().id()) + ")");
continue;
}
tasks.push_back(task);
// Set ExecutorInfo::framework_id if missing since this
// field was added to the API later and thus was made
// optional.
if (task.has_executor() && !task.executor().has_framework_id()) {
tasks.back().mutable_executor()->mutable_framework_id()->CopyFrom(
call.framework_info().id());
}
}
LaunchTasksMessage message;
message.mutable_framework_id()->CopyFrom(call.framework_info().id());
message.mutable_filters()->CopyFrom(call.launch().filters());
message.mutable_offer_ids()->CopyFrom(call.launch().offer_ids());
foreach (const TaskInfo& task, tasks) {
message.add_tasks()->CopyFrom(task);
}
send(master.get(), message);
break;
}
case Call::KILL: {
if (!call.has_kill()) {
drop(call, "Expecting 'kill' to be present");
return;
}
KillTaskMessage message;
message.mutable_framework_id()->CopyFrom(call.framework_info().id());
message.mutable_task_id()->CopyFrom(call.kill().task_id());
send(master.get(), message);
break;
}
case Call::ACKNOWLEDGE: {
if (!call.has_acknowledge()) {
drop(call, "Expecting 'acknowledge' to be present");
return;
}
StatusUpdateAcknowledgementMessage message;
message.mutable_framework_id()->CopyFrom(call.framework_info().id());
message.mutable_slave_id()->CopyFrom(call.acknowledge().slave_id());
message.mutable_task_id()->CopyFrom(call.acknowledge().task_id());
message.set_uuid(call.acknowledge().uuid());
send(master.get(), message);
break;
}
case Call::RECONCILE: {
if (!call.has_reconcile()) {
drop(call, "Expecting 'reconcile' to be present");
return;
}
ReconcileTasksMessage message;
message.mutable_framework_id()->CopyFrom(call.framework_info().id());
message.mutable_statuses()->CopyFrom(call.reconcile().statuses());
send(master.get(), message);
break;
}
case Call::MESSAGE: {
if (!call.has_message()) {
drop(call, "Expecting 'message' to be present");
return;
}
FrameworkToExecutorMessage message;
message.mutable_slave_id()->CopyFrom(call.message().slave_id());
message.mutable_framework_id()->CopyFrom(call.framework_info().id());
message.mutable_executor_id()->CopyFrom(call.message().executor_id());
message.set_data(call.message().data());
send(master.get(), message);
break;
}
default:
VLOG(1) << "Unexpected call " << stringify(call.type());
break;
}
}
protected:
virtual void initialize()
{
install<FrameworkRegisteredMessage>(&MesosProcess::receive);
install<FrameworkReregisteredMessage>(&MesosProcess::receive);
install<ResourceOffersMessage>(&MesosProcess::receive);
install<RescindResourceOfferMessage>(&MesosProcess::receive);
install<StatusUpdateMessage>(&MesosProcess::receive);
install<LostSlaveMessage>(&MesosProcess::receive);
install<ExecutorToFrameworkMessage>(&MesosProcess::receive);
install<FrameworkErrorMessage>(&MesosProcess::receive);
// Start detecting masters.
detector->detect()
.onAny(defer(self(), &MesosProcess::detected, lambda::_1));
}
void detected(const Future<Option<MasterInfo> >& future)
{
CHECK(!future.isDiscarded());
if (future.isFailed()) {
error("Failed to detect a master: " + future.failure());
return;
}
if (future.get().isNone()) {
master = None();
VLOG(1) << "No master detected";
mutex.lock()
.then(defer(self(), &Self::_detected))
.onAny(lambda::bind(&Mutex::unlock, mutex));
} else {
master = UPID(future.get().get().pid());
VLOG(1) << "New master detected at " << master.get();
if (credential.isSome()) {
// Authenticate with the master.
authenticate();
} else {
mutex.lock()
.then(defer(self(), &Self::__detected))
.onAny(lambda::bind(&Mutex::unlock, mutex));
}
}
// Keep detecting masters.
detector->detect(future.get())
.onAny(defer(self(), &MesosProcess::detected, lambda::_1));
}
Future<Nothing> _detected()
{
return async(disconnected);
}
Future<Nothing> __detected()
{
return async(connected);
}
void authenticate()
{
authenticated = false;
// We retry to authenticate and it's possible that we'll get
// disconnected while that is happening.
if (master.isNone()) {
return;
}
if (authenticating.isSome()) {
// Authentication is in progress. Try to cancel it.
// Note that it is possible that 'authenticating' is ready
// and the dispatch to '_authenticate' is enqueued when we
// are here, making the 'discard' here a no-op. This is ok
// because we set 'reauthenticate' here which enforces a retry
// in '_authenticate'.
Future<bool>(authenticating.get()).discard();
reauthenticate = true;
return;
}
VLOG(1) << "Authenticating with master " << master.get();
CHECK_SOME(credential);
CHECK(authenticatee == NULL);
authenticatee = new sasl::Authenticatee(credential.get(), self());
// NOTE: We do not pass 'Owned<Authenticatee>' here because doing
// so could make 'AuthenticateeProcess' responsible for deleting
// 'Authenticatee' causing a deadlock because the destructor of
// 'Authenticatee' waits on 'AuthenticateeProcess'.
// This will happen in the following scenario:
// --> 'AuthenticateeProcess' does a 'Future.set()'.
// --> '_authenticate()' is dispatched to this process.
// --> This process executes '_authenticatee()'.
// --> 'AuthenticateeProcess' removes the onAny callback
// from its queue which holds the last reference to
// 'Authenticatee'.
// --> '~Authenticatee()' is invoked by 'AuthenticateeProcess'.
// TODO(vinod): Consider using 'Shared' to 'Owned' upgrade.
authenticating = authenticatee->authenticate(master.get())
.onAny(defer(self(), &Self::_authenticate));
delay(Seconds(5),
self(),
&Self::authenticationTimeout,
authenticating.get());
}
void _authenticate()
{
delete CHECK_NOTNULL(authenticatee);
authenticatee = NULL;
CHECK_SOME(authenticating);
const Future<bool>& future = authenticating.get();
if (master.isNone()) {
VLOG(1) << "Ignoring authentication because no master is detected";
authenticating = None();
// Set it to false because we do not want further retries until
// a new master is detected.
// We obviously do not need to reauthenticate either even if
// 'reauthenticate' is currently true because the master is
// lost.
reauthenticate = false;
return;
}
if (reauthenticate || !future.isReady()) {
VLOG(1)
<< "Failed to authenticate with master " << master.get() << ": "
<< (reauthenticate ? "master changed" :
(future.isFailed() ? future.failure() : "future discarded"));
authenticating = None();
reauthenticate = false;
// TODO(vinod): Add a limit on number of retries.
dispatch(self(), &Self::authenticate); // Retry.
return;
}
if (!future.get()) {
VLOG(1) << "Master " << master.get() << " refused authentication";
error("Authentication refused");
return;
}
VLOG(1) << "Successfully authenticated with master " << master.get();
authenticated = true;
authenticating = None();
mutex.lock()
.then(defer(self(), &Self::__authenticate))
.onAny(lambda::bind(&Mutex::unlock, mutex));
}
Future<Nothing> __authenticate()
{
return async(connected);
}
void authenticationTimeout(Future<bool> future)
{
// NOTE: Discarded future results in a retry in '_authenticate()'.
// Also note that a 'discard' here is safe even if another
// authenticator is in progress because this copy of the future
// corresponds to the original authenticator that started the timer.
if (future.discard()) { // This is a no-op if the future is already ready.
LOG(WARNING) << "Authentication timed out";
}
}
// NOTE: A None 'from' is possible when an event is injected locally.
void receive(const Option<UPID>& from, const Event& event)
{
// Check if we're disconnected but received an event.
if (from.isSome() && master.isNone()) {
// Ignore the event unless it's a registered or reregistered.
if (event.type() != Event::REGISTERED &&
event.type() != Event::REREGISTERED) {
VLOG(1) << "Ignoring " << stringify(event.type())
<< " event because we're disconnected";
return;
}
} else if (from.isSome() && master != from) {
VLOG(1)
<< "Ignoring " << stringify(event.type())
<< " event because it was sent from '" << from.get()
<< "' instead of the leading master '" << master.get() << "'";
return;
}
// Note that if 'from' is None we're locally injecting this event
// so we always want to enqueue it even if we're not connected!
VLOG(1) << "Enqueuing event " << stringify(event.type()) << " from "
<< (from.isNone() ? "(locally injected)" : from.get());
// Queue up the event and invoke the 'received' callback if this
// is the first event (between now and when the 'received'
// callback actually gets invoked more events might get queued).
events.push(event);
if (events.size() == 1) {
mutex.lock()
.then(defer(self(), &Self::_receive))
.onAny(lambda::bind(&Mutex::unlock, mutex));
}
}
Future<Nothing> _receive()
{
Future<Nothing> future = async(received, events);
events = queue<Event>();
return future;
}
void receive(const UPID& from, const FrameworkRegisteredMessage& message)
{
// We've now registered at least once with the master so we're no
// longer failing over. See the comment where 'failover' is
// declared for further details.
failover = false;
Event event;
event.set_type(Event::REGISTERED);
Event::Registered* registered = event.mutable_registered();
registered->mutable_framework_id()->CopyFrom(message.framework_id());
registered->mutable_master_info()->CopyFrom(message.master_info());
receive(from, event);
}
void receive(const UPID& from, const FrameworkReregisteredMessage& message)
{
// We've now registered at least once with the master so we're no
// longer failing over. See the comment where 'failover' is
// declared for further details.
failover = false;
Event event;
event.set_type(Event::REREGISTERED);
Event::Reregistered* reregistered = event.mutable_reregistered();
reregistered->mutable_framework_id()->CopyFrom(message.framework_id());
reregistered->mutable_master_info()->CopyFrom(message.master_info());
receive(from, event);
}
void receive(const UPID& from, const ResourceOffersMessage& message)
{
Event event;
event.set_type(Event::OFFERS);
Event::Offers* offers = event.mutable_offers();
offers->mutable_offers()->CopyFrom(message.offers());
receive(from, event);
}
void receive(const UPID& from, const RescindResourceOfferMessage& message)
{
Event event;
event.set_type(Event::RESCIND);
Event::Rescind* rescind = event.mutable_rescind();
rescind->mutable_offer_id()->CopyFrom(message.offer_id());
receive(from, event);
}
void receive(const UPID& from, const StatusUpdateMessage& message)
{
Event event;
event.set_type(Event::UPDATE);
Event::Update* update = event.mutable_update();
update->mutable_status()->CopyFrom(message.update().status());
if (message.update().has_slave_id()) {
update->mutable_status()->mutable_slave_id()->CopyFrom(
message.update().slave_id());
}
if (message.update().has_executor_id()) {
update->mutable_status()->mutable_executor_id()->CopyFrom(
message.update().executor_id());
}
update->mutable_status()->set_timestamp(message.update().timestamp());
update->set_uuid(message.update().uuid());
receive(from, event);
}
void receive(const UPID& from, const LostSlaveMessage& message)
{
Event event;
event.set_type(Event::FAILURE);
Event::Failure* failure = event.mutable_failure();
failure->mutable_slave_id()->CopyFrom(message.slave_id());
receive(from, event);
}
void receive(const UPID& from, const ExecutorToFrameworkMessage& _message)
{
Event event;
event.set_type(Event::MESSAGE);
Event::Message* message = event.mutable_message();
message->mutable_slave_id()->CopyFrom(_message.slave_id());
message->mutable_executor_id()->CopyFrom(_message.executor_id());
message->set_data(_message.data());
receive(from, event);
}
void receive(const UPID& from, const FrameworkErrorMessage& message)
{
Event event;
event.set_type(Event::ERROR);
Event::Error* error = event.mutable_error();
error->set_message(message.message());
receive(from, event);
}
// Helper for injecting an ERROR event.
void error(const string& message)
{
Event event;
event.set_type(Event::ERROR);
Event::Error* error = event.mutable_error();
error->set_message(message);
receive(None(), event);
}
// Helper for "dropping" a task that was launched.
void drop(const TaskInfo& task, const string& message)
{
Event event;
event.set_type(Event::UPDATE);
Event::Update* update = event.mutable_update();
TaskStatus* status = update->mutable_status();
status->mutable_task_id()->CopyFrom(task.task_id());
status->set_state(TASK_LOST);
status->set_message(message);
status->set_timestamp(Clock::now().secs());
update->set_uuid(UUID::random().toBytes());
receive(None(), event);
}
void drop(const Call& call, const string& message)
{
VLOG(1) << "Dropping " << stringify(call.type()) << ": " << message;
switch (call.type()) {
case Call::LAUNCH: {
// We drop the tasks preemptively (enqueing update events that
// put the task in TASK_LOST). This is a hack for now, to keep
// the tasks from being forever in PENDING state, when
// actually the master never received the launch.
// Unfortuantely this is insufficient since it doesn't capture
// the case when the scheduler process sends it but the master
// never receives it (i.e., during a master failover). In the
// future, this should be solved by higher-level abstractions
// and this hack should be considered for removal.
foreach (const TaskInfo& task, call.launch().task_infos()) {
drop(task, message);
}
break;
}
default:
break;
}
}
private:
const Option<Credential> credential;
Mutex mutex; // Used to serialize the callback invocations.
lambda::function<void(void)> connected;
lambda::function<void(void)> disconnected;
lambda::function<void(const queue<Event>&)> received;
bool local; // Whether or not we launched a local cluster.
// Whether or not this is the first time we've sent a
// REREGISTER. This is to maintain compatibility with what the
// master expects from SchedulerProcess. After the first REGISTER or
// REREGISTER event we force this to be false.
bool failover;
MasterDetector* detector;
queue<Event> events;
Option<UPID> master;
sasl::Authenticatee* authenticatee;
// Indicates if an authentication attempt is in progress.
Option<Future<bool> > authenticating;
// Indicates if the authentication is successful.
bool authenticated;
// Indicates if a new authentication attempt should be enforced.
bool reauthenticate;
};
Mesos::Mesos(
const string& master,
const lambda::function<void(void)>& connected,
const lambda::function<void(void)>& disconnected,
const lambda::function<void(const queue<Event>&)>& received)
{
process =
new MesosProcess(master, None(), connected, disconnected, received);
spawn(process);
}
Mesos::Mesos(
const string& master,
const Credential& credential,
const lambda::function<void(void)>& connected,
const lambda::function<void(void)>& disconnected,
const lambda::function<void(const queue<Event>&)>& received)
{
process =
new MesosProcess(master, credential, connected, disconnected, received);
spawn(process);
}
Mesos::~Mesos()
{
terminate(process);
wait(process);
delete process;
}
void Mesos::send(const Call& call)
{
dispatch(process, &MesosProcess::send, call);
}
} // namespace scheduler {
} // namespace mesos {