blob: e9488039551b8c79c29f000260149a2c99b64146 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdint.h>
#include <algorithm>
#include <cctype>
#include <fstream>
#include <iomanip>
#include <list>
#include <sstream>
#include <process/check.hpp>
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/id.hpp>
#include <process/limiter.hpp>
#include <process/owned.hpp>
#include <process/run.hpp>
#include <process/metrics/metrics.hpp>
#include <stout/check.hpp>
#include <stout/error.hpp>
#include <stout/lambda.hpp>
#include <stout/memory.hpp>
#include <stout/multihashmap.hpp>
#include <stout/net.hpp>
#include <stout/nothing.hpp>
#include <stout/numify.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/stringify.hpp>
#include <stout/utils.hpp>
#include <stout/uuid.hpp>
#include "authorizer/authorizer.hpp"
#include "sasl/authenticator.hpp"
#include "common/build.hpp"
#include "common/date_utils.hpp"
#include "common/protobuf_utils.hpp"
#include "common/status_utils.hpp"
#include "credentials/credentials.hpp"
#include "logging/flags.hpp"
#include "logging/logging.hpp"
#include "master/allocator.hpp"
#include "master/flags.hpp"
#include "master/master.hpp"
using std::list;
using std::string;
using std::vector;
using process::await;
using process::wait; // Necessary on some OS's to disambiguate.
using process::Clock;
using process::ExitedEvent;
using process::Failure;
using process::Future;
using process::MessageEvent;
using process::Owned;
using process::PID;
using process::Process;
using process::Promise;
using process::Time;
using process::Timer;
using process::UPID;
using process::metrics::Counter;
using memory::shared_ptr;
namespace mesos {
namespace internal {
namespace master {
using allocator::Allocator;
class WhitelistWatcher : public Process<WhitelistWatcher> {
public:
WhitelistWatcher(const string& _path, Allocator* _allocator)
: ProcessBase(process::ID::generate("whitelist")),
path(_path),
allocator(_allocator) {}
protected:
virtual void initialize()
{
watch();
}
void watch()
{
// Get the list of white listed slaves.
Option<hashset<string> > whitelist;
if (path == "*") { // Accept all slaves.
VLOG(1) << "No whitelist given. Advertising offers for all slaves";
} else {
// Read from local file.
// TODO(vinod): Add support for reading from ZooKeeper.
// TODO(vinod): Ensure this read is atomic w.r.t external
// writes/updates to this file.
Try<string> read = os::read(
strings::remove(path, "file://", strings::PREFIX));
if (read.isError()) {
LOG(ERROR) << "Error reading whitelist file: " << read.error() << ". "
<< "Retrying";
whitelist = lastWhitelist;
} else if (read.get().empty()) {
LOG(WARNING) << "Empty whitelist file " << path << ". "
<< "No offers will be made!";
whitelist = hashset<string>();
} else {
hashset<string> hostnames;
vector<string> lines = strings::tokenize(read.get(), "\n");
foreach (const string& hostname, lines) {
hostnames.insert(hostname);
}
whitelist = hostnames;
}
}
// Send the whitelist to allocator, if necessary.
if (whitelist != lastWhitelist) {
allocator->updateWhitelist(whitelist);
}
// Check again.
lastWhitelist = whitelist;
delay(WHITELIST_WATCH_INTERVAL, self(), &WhitelistWatcher::watch);
}
private:
const string path;
Allocator* allocator;
Option<hashset<string> > lastWhitelist;
};
class SlaveObserver : public Process<SlaveObserver>
{
public:
SlaveObserver(const UPID& _slave,
const SlaveInfo& _slaveInfo,
const SlaveID& _slaveId,
const PID<Master>& _master)
: ProcessBase(process::ID::generate("slave-observer")),
slave(_slave),
slaveInfo(_slaveInfo),
slaveId(_slaveId),
master(_master),
timeouts(0),
pinged(false)
{
install("PONG", &SlaveObserver::pong);
}
protected:
virtual void initialize()
{
send(slave, "PING");
pinged = true;
delay(SLAVE_PING_TIMEOUT, self(), &SlaveObserver::timeout);
}
void pong(const UPID& from, const string& body)
{
timeouts = 0;
pinged = false;
}
void timeout()
{
if (pinged) { // So we haven't got back a pong yet ...
if (++timeouts >= MAX_SLAVE_PING_TIMEOUTS) {
shutdown();
return;
}
}
send(slave, "PING");
pinged = true;
delay(SLAVE_PING_TIMEOUT, self(), &SlaveObserver::timeout);
}
void shutdown()
{
dispatch(master, &Master::shutdownSlave, slaveId, "health check timed out");
}
private:
const UPID slave;
const SlaveInfo slaveInfo;
const SlaveID slaveId;
const PID<Master> master;
uint32_t timeouts;
bool pinged;
};
Master::Master(
Allocator* _allocator,
Registrar* _registrar,
Repairer* _repairer,
Files* _files,
MasterContender* _contender,
MasterDetector* _detector,
const Option<Authorizer*>& _authorizer,
const Flags& _flags)
: ProcessBase("master"),
http(this),
flags(_flags),
allocator(_allocator),
registrar(_registrar),
repairer(_repairer),
files(_files),
contender(_contender),
detector(_detector),
authorizer(_authorizer),
metrics(*this),
electedTime(None())
{
// NOTE: We populate 'info_' here instead of inside 'initialize()'
// because 'StandaloneMasterDetector' needs access to the info.
// The master ID is currently comprised of the current date, the IP
// address and port from self() and the OS PID.
Try<string> id =
strings::format("%s-%u-%u-%d", DateUtils::currentDate(),
self().ip, self().port, getpid());
CHECK(!id.isError()) << id.error();
info_.set_id(id.get());
info_.set_ip(self().ip);
info_.set_port(self().port);
info_.set_pid(self());
// Determine our hostname or use the hostname provided.
string hostname;
if (flags.hostname.isNone()) {
Try<string> result = net::getHostname(self().ip);
if (result.isError()) {
LOG(FATAL) << "Failed to get hostname: " << result.error();
}
hostname = result.get();
} else {
hostname = flags.hostname.get();
}
info_.set_hostname(hostname);
}
Master::~Master() {}
void Master::initialize()
{
LOG(INFO) << "Master " << info_.id() << " (" << info_.hostname() << ")"
<< " started on " << string(self()).substr(7);
// NOTE: We enforce a minimum slave re-register timeout because the
// slave bounds its (re-)registration retries based on the minimum.
if (flags.slave_reregister_timeout < MIN_SLAVE_REREGISTER_TIMEOUT) {
EXIT(1) << "Invalid value '" << flags.slave_reregister_timeout << "' "
<< "for --slave_reregister_timeout: "
<< "Must be at least " << MIN_SLAVE_REREGISTER_TIMEOUT;
}
// Parse the percentage for the slave removal limit.
// TODO(bmahler): Add a 'Percentage' abstraction.
if (!strings::endsWith(flags.recovery_slave_removal_limit, "%")) {
EXIT(1) << "Invalid value '" << flags.recovery_slave_removal_limit << "' "
<< "for --recovery_slave_removal_percent_limit: " << "missing '%'";
}
Try<double> limit = numify<double>(
strings::remove(
flags.recovery_slave_removal_limit,
"%",
strings::SUFFIX));
if (limit.isError()) {
EXIT(1) << "Invalid value '" << flags.recovery_slave_removal_limit << "' "
<< "for --recovery_slave_removal_percent_limit: " << limit.error();
}
if (limit.get() < 0.0 || limit.get() > 100.0) {
EXIT(1) << "Invalid value '" << flags.recovery_slave_removal_limit << "' "
<< "for --recovery_slave_removal_percent_limit: "
<< "Must be within [0%-100%]";
}
// Log authentication state.
if (flags.authenticate_frameworks) {
LOG(INFO) << "Master only allowing authenticated frameworks to register";
} else {
LOG(INFO) << "Master allowing unauthenticated frameworks to register";
}
if (flags.authenticate_slaves) {
LOG(INFO) << "Master only allowing authenticated slaves to register";
} else {
LOG(INFO) << "Master allowing unauthenticated slaves to register";
}
// Load credentials.
if (flags.credentials.isSome()) {
const string& path =
strings::remove(flags.credentials.get(), "file://", strings::PREFIX);
Result<Credentials> _credentials = credentials::read(path);
if (_credentials.isError()) {
EXIT(1) << _credentials.error() << " (see --credentials flag)";
} else if (_credentials.isNone()) {
EXIT(1) << "Credentials file must contain at least one credential"
<< " (see --credentials flag)";
}
// Store credentials in master to use them in routes.
credentials = _credentials.get();
// Load "registration" credentials into SASL based Authenticator.
sasl::secrets::load(_credentials.get());
} else if (flags.authenticate_frameworks || flags.authenticate_slaves) {
EXIT(1) << "Authentication requires a credentials file"
<< " (see --credentials flag)";
}
if (authorizer.isSome()) {
LOG(INFO) << "Authorization enabled";
}
if (flags.rate_limits.isSome()) {
// Add framework rate limiters.
foreach (const RateLimit& limit_, flags.rate_limits.get().limits()) {
if (limiters.contains(limit_.principal())) {
EXIT(1) << "Duplicate principal " << limit_.principal()
<< " found in RateLimits configuration";
}
if (limit_.has_qps() && limit_.qps() <= 0) {
EXIT(1) << "Invalid qps: " << limit_.qps()
<< ". It must be a positive number";
}
if (limit_.has_qps()) {
Option<uint64_t> capacity;
if (limit_.has_capacity()) {
capacity = limit_.capacity();
}
limiters.put(
limit_.principal(),
Owned<BoundedRateLimiter>(
new BoundedRateLimiter(limit_.qps(), capacity)));
} else {
limiters.put(limit_.principal(), None());
}
}
if (flags.rate_limits.get().has_aggregate_default_qps() &&
flags.rate_limits.get().aggregate_default_qps() <= 0) {
EXIT(1) << "Invalid aggregate_default_qps: "
<< flags.rate_limits.get().aggregate_default_qps()
<< ". It must be a positive number";
}
if (flags.rate_limits.get().has_aggregate_default_qps()) {
Option<uint64_t> capacity;
if (flags.rate_limits.get().has_aggregate_default_capacity()) {
capacity = flags.rate_limits.get().aggregate_default_capacity();
}
defaultLimiter = Owned<BoundedRateLimiter>(
new BoundedRateLimiter(
flags.rate_limits.get().aggregate_default_qps(), capacity));
}
LOG(INFO) << "Framework rate limiting enabled";
}
hashmap<string, RoleInfo> roleInfos;
// Add the default role.
RoleInfo roleInfo;
roleInfo.set_name("*");
roleInfos["*"] = roleInfo;
// Add other roles.
if (flags.roles.isSome()) {
vector<string> tokens = strings::tokenize(flags.roles.get(), ",");
foreach (const std::string& role, tokens) {
RoleInfo roleInfo;
roleInfo.set_name(role);
roleInfos[role] = roleInfo;
}
}
// Add role weights.
if (flags.weights.isSome()) {
vector<string> tokens = strings::tokenize(flags.weights.get(), ",");
foreach (const std::string& token, tokens) {
vector<string> pair = strings::tokenize(token, "=");
if (pair.size() != 2) {
EXIT(1) << "Invalid weight: '" << token << "'. --weights should"
"be of the form 'role=weight,role=weight'\n";
} else if (!roleInfos.contains(pair[0])) {
EXIT(1) << "Invalid weight: '" << token << "'. " << pair[0]
<< " is not a valid role.";
}
double weight = atof(pair[1].c_str());
if (weight <= 0) {
EXIT(1) << "Invalid weight: '" << token
<< "'. Weights must be positive.";
}
roleInfos[pair[0]].set_weight(weight);
}
}
foreachpair (const std::string& role,
const RoleInfo& roleInfo,
roleInfos) {
roles[role] = new Role(roleInfo);
}
// Initialize the allocator.
allocator->initialize(flags, self(), roleInfos);
// Parse the white list
whitelistWatcher = new WhitelistWatcher(flags.whitelist, allocator);
spawn(whitelistWatcher);
nextFrameworkId = 0;
nextSlaveId = 0;
nextOfferId = 0;
// Start all the statistics at 0.
stats.tasks[TASK_STAGING] = 0;
stats.tasks[TASK_STARTING] = 0;
stats.tasks[TASK_RUNNING] = 0;
stats.tasks[TASK_FINISHED] = 0;
stats.tasks[TASK_FAILED] = 0;
stats.tasks[TASK_KILLED] = 0;
stats.tasks[TASK_LOST] = 0;
stats.validStatusUpdates = 0;
stats.invalidStatusUpdates = 0;
stats.validFrameworkMessages = 0;
stats.invalidFrameworkMessages = 0;
startTime = Clock::now();
// Install handler functions for certain messages.
install<SubmitSchedulerRequest>(
&Master::submitScheduler,
&SubmitSchedulerRequest::name);
install<RegisterFrameworkMessage>(
&Master::registerFramework,
&RegisterFrameworkMessage::framework);
install<ReregisterFrameworkMessage>(
&Master::reregisterFramework,
&ReregisterFrameworkMessage::framework,
&ReregisterFrameworkMessage::failover);
install<UnregisterFrameworkMessage>(
&Master::unregisterFramework,
&UnregisterFrameworkMessage::framework_id);
install<DeactivateFrameworkMessage>(
&Master::deactivateFramework,
&DeactivateFrameworkMessage::framework_id);
install<ResourceRequestMessage>(
&Master::resourceRequest,
&ResourceRequestMessage::framework_id,
&ResourceRequestMessage::requests);
install<LaunchTasksMessage>(
&Master::launchTasks,
&LaunchTasksMessage::framework_id,
&LaunchTasksMessage::tasks,
&LaunchTasksMessage::filters,
&LaunchTasksMessage::offer_ids);
install<ReviveOffersMessage>(
&Master::reviveOffers,
&ReviveOffersMessage::framework_id);
install<KillTaskMessage>(
&Master::killTask,
&KillTaskMessage::framework_id,
&KillTaskMessage::task_id);
install<StatusUpdateAcknowledgementMessage>(
&Master::statusUpdateAcknowledgement,
&StatusUpdateAcknowledgementMessage::slave_id,
&StatusUpdateAcknowledgementMessage::framework_id,
&StatusUpdateAcknowledgementMessage::task_id,
&StatusUpdateAcknowledgementMessage::uuid);
install<FrameworkToExecutorMessage>(
&Master::schedulerMessage,
&FrameworkToExecutorMessage::slave_id,
&FrameworkToExecutorMessage::framework_id,
&FrameworkToExecutorMessage::executor_id,
&FrameworkToExecutorMessage::data);
install<RegisterSlaveMessage>(
&Master::registerSlave,
&RegisterSlaveMessage::slave);
install<ReregisterSlaveMessage>(
&Master::reregisterSlave,
&ReregisterSlaveMessage::slave_id,
&ReregisterSlaveMessage::slave,
&ReregisterSlaveMessage::executor_infos,
&ReregisterSlaveMessage::tasks,
&ReregisterSlaveMessage::completed_frameworks);
install<UnregisterSlaveMessage>(
&Master::unregisterSlave,
&UnregisterSlaveMessage::slave_id);
install<StatusUpdateMessage>(
&Master::statusUpdate,
&StatusUpdateMessage::update,
&StatusUpdateMessage::pid);
install<ReconcileTasksMessage>(
&Master::reconcileTasks,
&ReconcileTasksMessage::framework_id,
&ReconcileTasksMessage::statuses);
install<ExitedExecutorMessage>(
&Master::exitedExecutor,
&ExitedExecutorMessage::slave_id,
&ExitedExecutorMessage::framework_id,
&ExitedExecutorMessage::executor_id,
&ExitedExecutorMessage::status);
install<AuthenticateMessage>(
&Master::authenticate,
&AuthenticateMessage::pid);
// Setup HTTP routes.
route("/health",
Http::HEALTH_HELP,
lambda::bind(&Http::health, http, lambda::_1));
route("/observe",
Http::OBSERVE_HELP,
lambda::bind(&Http::observe, http, lambda::_1));
route("/redirect",
Http::REDIRECT_HELP,
lambda::bind(&Http::redirect, http, lambda::_1));
route("/roles.json",
None(),
lambda::bind(&Http::roles, http, lambda::_1));
route("/shutdown",
Http::SHUTDOWN_HELP,
lambda::bind(&Http::shutdown, http, lambda::_1));
route("/state.json",
None(),
lambda::bind(&Http::state, http, lambda::_1));
route("/stats.json",
None(),
lambda::bind(&Http::stats, http, lambda::_1));
route("/tasks.json",
Http::TASKS_HELP,
lambda::bind(&Http::tasks, http, lambda::_1));
// Provide HTTP assets from a "webui" directory. This is either
// specified via flags (which is necessary for running out of the
// build directory before 'make install') or determined at build
// time via the preprocessor macro '-DMESOS_WEBUI_DIR' set in the
// Makefile.
provide("", path::join(flags.webui_dir, "master/static/index.html"));
provide("static", path::join(flags.webui_dir, "master/static"));
if (flags.log_dir.isSome()) {
Try<string> log = logging::getLogFile(
logging::getLogSeverity(flags.logging_level));
if (log.isError()) {
LOG(ERROR) << "Master log file cannot be found: " << log.error();
} else {
files->attach(log.get(), "/master/log")
.onAny(defer(self(), &Self::fileAttached, lambda::_1, log.get()));
}
}
contender->initialize(info_);
// Start contending to be a leading master and detecting the current
// leader.
contender->contend()
.onAny(defer(self(), &Master::contended, lambda::_1));
detector->detect()
.onAny(defer(self(), &Master::detected, lambda::_1));
}
void Master::finalize()
{
LOG(INFO) << "Master terminating";
// Remove the frameworks.
// Note we are not deleting the pointers to the frameworks from the
// allocator or the roles because it is unnecessary bookkeeping at
// this point since we are shutting down.
foreachvalue (Framework* framework, frameworks.registered) {
// Remove pending tasks from the framework.
framework->pendingTasks.clear();
// Remove pointers to the framework's tasks in slaves.
foreachvalue (Task* task, utils::copy(framework->tasks)) {
Slave* slave = getSlave(task->slave_id());
// Since we only find out about tasks when the slave re-registers,
// it must be the case that the slave exists!
CHECK(slave != NULL)
<< "Unknown slave " << task->slave_id()
<< " in the task " << task->task_id();
removeTask(task);
}
// Remove the framework's offers (if they weren't removed before).
foreach (Offer* offer, utils::copy(framework->offers)) {
removeOffer(offer);
}
delete framework;
}
frameworks.registered.clear();
CHECK_EQ(offers.size(), 0UL);
foreachvalue (Slave* slave, slaves.registered) {
// Remove tasks that are in the slave but not in any framework.
// This could happen when the framework has yet to re-register
// after master failover.
// NOTE: keys() and values() are used because slave->tasks is
// modified by removeTask()!
foreach (const FrameworkID& frameworkId, slave->tasks.keys()) {
foreach (Task* task, slave->tasks[frameworkId].values()) {
removeTask(task);
}
}
// Kill the slave observer.
terminate(slave->observer);
wait(slave->observer);
delete slave->observer;
delete slave;
}
slaves.registered.clear();
foreachvalue (Future<Nothing> future, authenticating) {
// NOTE: This is necessary during tests because a copy of
// this future is used to setup authentication timeout. If a
// test doesn't discard this future, authentication timeout might
// fire in a different test and any associated callbacks
// (e.g., '_authenticate()') would be called. This is because the
// master pid doesn't change across the tests.
// TODO(vinod): This seems to be a bug in libprocess or the
// testing infrastructure.
future.discard();
}
foreachvalue (Role* role, roles) {
delete role;
}
roles.clear();
// NOTE: This is necessary during tests because we don't want the
// timer to fire in a different test and invoke the callback.
// The callback would be invoked because the master pid doesn't
// change across the tests.
// TODO(vinod): This seems to be a bug in libprocess or the
// testing infrastructure.
if (slaves.recoveredTimer.isSome()) {
Timer::cancel(slaves.recoveredTimer.get());
}
terminate(whitelistWatcher);
wait(whitelistWatcher);
delete whitelistWatcher;
}
void Master::exited(const UPID& pid)
{
foreachvalue (Framework* framework, frameworks.registered) {
if (framework->pid == pid) {
LOG(INFO) << "Framework " << framework->id << " disconnected";
// Deactivate framework.
deactivate(framework);
// Set 'failoverTimeout' to the default and update only if the
// input is valid.
Try<Duration> failoverTimeout_ =
Duration::create(FrameworkInfo().failover_timeout());
CHECK_SOME(failoverTimeout_);
Duration failoverTimeout = failoverTimeout_.get();
failoverTimeout_ =
Duration::create(framework->info.failover_timeout());
if (failoverTimeout_.isSome()) {
failoverTimeout = failoverTimeout_.get();
} else {
LOG(WARNING) << "Using the default value for 'failover_timeout' because"
<< "the input value is invalid: "
<< failoverTimeout_.error();
}
LOG(INFO) << "Giving framework " << framework->id << " "
<< failoverTimeout << " to failover";
// Delay dispatching a message to ourselves for the timeout.
delay(failoverTimeout,
self(),
&Master::frameworkFailoverTimeout,
framework->id,
framework->reregisteredTime);
return;
}
}
// The semantics when a slave gets disconnected are as follows:
// 1) If the slave is not checkpointing, the slave is immediately
// removed and all tasks running on it are transitioned to LOST.
// No resources are recovered, because the slave is removed.
// 2) If the slave is checkpointing, the frameworks running on it
// fall into one of the 2 cases:
// 2.1) Framework is checkpointing: No immediate action is taken.
// The slave is given a chance to reconnect until the slave
// observer times out (75s) and removes the slave (Case 1).
// 2.2) Framework is not-checkpointing: The slave is not removed
// but the framework is removed from the slave's structs,
// its tasks transitioned to LOST and resources recovered.
foreachvalue (Slave* slave, slaves.registered) {
if (slave->pid == pid) {
LOG(INFO) << "Slave " << *slave << " disconnected";
if (!slave->info.checkpoint()) {
// Remove the slave, if it is not checkpointing.
LOG(INFO) << "Removing disconnected slave " << *slave
<< " because it is not checkpointing!";
removeSlave(slave);
return;
} else if (!slave->disconnected) {
// Checkpointing slaves can just be disconnected.
disconnect(slave);
// Remove all non-checkpointing frameworks.
hashset<FrameworkID> frameworkIds =
slave->tasks.keys() | slave->executors.keys();
foreach (const FrameworkID& frameworkId, frameworkIds) {
Framework* framework = getFramework(frameworkId);
if (framework != NULL && !framework->info.checkpoint()) {
LOG(INFO) << "Removing framework " << frameworkId
<< " from disconnected slave " << *slave
<< " because the framework is not checkpointing";
removeFramework(slave, framework);
}
}
} else {
LOG(WARNING) << "Ignoring duplicate exited() notification for "
<< "checkpointing slave " << *slave;
}
}
}
}
void Master::visit(const MessageEvent& event)
{
// There are three cases about the message's UPID with respect to
// 'frameworks.principals':
// 1) if a <UPID, principal> pair exists and the principal is Some,
// it's a framework with its principal specified.
// 2) if a <UPID, principal> pair exists and the principal is None,
// it's a framework without a principal.
// 3) if a <UPID, principal> pair does not exist in the map, it's
// either an unregistered framework or not a framework.
// The logic for framework message counters and rate limiting
// mainly concerns with whether the UPID is a *registered*
// framework and whether the framework has a principal so we use
// these two temp variables to simplify the condition checks below.
bool isRegisteredFramework =
frameworks.principals.contains(event.message->from);
const Option<string> principal = isRegisteredFramework
? frameworks.principals[event.message->from]
: Option<string>::none();
// Increment the "message_received" counter if the message is from
// a framework and such a counter is configured for it.
// See comments for 'Master::Metrics::Frameworks' and
// 'Master::Frameworks::principals' for details.
if (principal.isSome()) {
// If the framework has a principal, the counter must exist.
CHECK(metrics.frameworks.contains(principal.get()));
Counter messages_received =
metrics.frameworks.get(principal.get()).get()->messages_received;
++messages_received;
}
// All messages are filtered when non-leading.
if (!elected()) {
VLOG(1) << "Dropping '" << event.message->name << "' message since "
<< "not elected yet";
++metrics.dropped_messages;
return;
}
CHECK_SOME(recovered);
// All messages are filtered while recovering.
// TODO(bmahler): Consider instead re-enqueing *all* messages
// through recover(). What are the performance implications of
// the additional queueing delay and the accumulated backlog
// of messages post-recovery?
if (!recovered.get().isReady()) {
VLOG(1) << "Dropping '" << event.message->name << "' message since "
<< "not recovered yet";
++metrics.dropped_messages;
return;
}
// Throttle the message if it's a framework message and a
// RateLimiter is configured for the framework's principal.
// The framework is throttled by the default RateLimiter if:
// 1) the default RateLimiter is configured (and)
// 2) the framework doesn't have a principal or its principal is
// not specified in 'flags.rate_limits'.
// The framework is not throttled if:
// 1) the default RateLimiter is not configured to handle case 2)
// above. (or)
// 2) the principal exists in RateLimits but 'qps' is not set.
if (principal.isSome() &&
limiters.contains(principal.get()) &&
limiters[principal.get()].isSome()) {
const Owned<BoundedRateLimiter>& limiter = limiters[principal.get()].get();
if (limiter->capacity.isNone() ||
limiter->messages < limiter->capacity.get()) {
limiter->messages++;
limiter->limiter->acquire()
.onReady(defer(self(), &Self::throttled, event, principal));
} else {
exceededCapacity(
event,
principal,
limiter->capacity.get());
}
} else if ((principal.isNone() || !limiters.contains(principal.get())) &&
isRegisteredFramework &&
defaultLimiter.isSome()) {
if (defaultLimiter.get()->capacity.isNone() ||
defaultLimiter.get()->messages < defaultLimiter.get()->capacity.get()) {
defaultLimiter.get()->messages++;
defaultLimiter.get()->limiter->acquire()
.onReady(defer(self(), &Self::throttled, event, None()));
} else {
exceededCapacity(
event,
principal,
defaultLimiter.get()->capacity.get());
}
} else {
_visit(event);
}
}
void Master::visit(const ExitedEvent& event)
{
// See comments in 'visit(const MessageEvent& event)' for which
// RateLimiter is used to throttle this UPID and when it is not
// throttled.
// Note that throttling ExitedEvent is necessary so the order
// between MessageEvents and ExitedEvents from the same PID is
// maintained. Also ExitedEvents are not subject to the capacity.
bool isRegisteredFramework = frameworks.principals.contains(event.pid);
const Option<string> principal = isRegisteredFramework
? frameworks.principals[event.pid]
: Option<string>::none();
// Necessary to disambiguate below.
typedef void(Self::*F)(const ExitedEvent&);
if (principal.isSome() &&
limiters.contains(principal.get()) &&
limiters[principal.get()].isSome()) {
limiters[principal.get()].get()->limiter->acquire()
.onReady(defer(self(), static_cast<F>(&Self::_visit), event));
} else if ((principal.isNone() || !limiters.contains(principal.get())) &&
isRegisteredFramework &&
defaultLimiter.isSome()) {
defaultLimiter.get()->limiter->acquire()
.onReady(defer(self(), static_cast<F>(&Self::_visit), event));
} else {
_visit(event);
}
}
void Master::throttled(
const MessageEvent& event,
const Option<std::string>& principal)
{
// We already know a RateLimiter is used to throttle this event so
// here we only need to determine which.
if (principal.isSome()) {
CHECK_SOME(limiters[principal.get()]);
limiters[principal.get()].get()->messages--;
} else {
CHECK_SOME(defaultLimiter);
defaultLimiter.get()->messages--;
}
_visit(event);
}
void Master::_visit(const MessageEvent& event)
{
// Obtain the principal before processing the Message because the
// mapping may be deleted in handling 'UnregisterFrameworkMessage'
// but its counter still needs to be incremented for this message.
const Option<string> principal =
frameworks.principals.contains(event.message->from)
? frameworks.principals[event.message->from]
: Option<string>::none();
ProtobufProcess<Master>::visit(event);
// Increment 'messages_processed' counter if it still exists.
// Note that it could be removed in handling
// 'UnregisterFrameworkMessage' if it's the last framework with
// this principal.
if (principal.isSome() && metrics.frameworks.contains(principal.get())) {
Counter messages_processed =
metrics.frameworks.get(principal.get()).get()->messages_processed;
++messages_processed;
}
}
void Master::exceededCapacity(
const MessageEvent& event,
const Option<string>& principal,
uint64_t capacity)
{
LOG(WARNING) << "Dropping message " << event.message->name << " from "
<< event.message->from
<< (principal.isSome() ? "(" + principal.get() + ")" : "")
<< ": capacity(" << capacity << ") exceeded";
// Send an error to the framework which will abort the scheduler
// driver.
// NOTE: The scheduler driver will send back a
// DeactivateFrameworkMessage which may be dropped as well but this
// should be fine because the scheduler is already informed of an
// unrecoverable error and should take action to recover.
FrameworkErrorMessage message;
message.set_message(
"Message " + event.message->name +
" dropped: capacity(" + stringify(capacity) + ") exceeded");
send(event.message->from, message);
}
void Master::_visit(const ExitedEvent& event)
{
Process<Master>::visit(event);
}
void fail(const string& message, const string& failure)
{
LOG(FATAL) << message << ": " << failure;
}
Future<Nothing> Master::recover()
{
if (!elected()) {
return Failure("Not elected as leading master");
}
if (recovered.isNone()) {
LOG(INFO) << "Recovering from registrar";
recovered = registrar->recover(info_)
.then(defer(self(), &Self::_recover, lambda::_1));
}
return recovered.get();
}
Future<Nothing> Master::_recover(const Registry& registry)
{
foreach (const Registry::Slave& slave, registry.slaves().slaves()) {
slaves.recovered.insert(slave.info().id());
}
// Set up a timeout for slaves to re-register. This timeout is based
// on the maximum amount of time the SlaveObserver allows slaves to
// not respond to health checks.
// TODO(bmahler): Consider making this configurable.
slaves.recoveredTimer =
delay(flags.slave_reregister_timeout,
self(),
&Self::recoveredSlavesTimeout,
registry);
// Recovery is now complete!
LOG(INFO) << "Recovered " << registry.slaves().slaves().size() << " slaves"
<< " from the Registry (" << Bytes(registry.ByteSize()) << ")"
<< " ; allowing " << flags.slave_reregister_timeout
<< " for slaves to re-register";
return Nothing();
}
void Master::recoveredSlavesTimeout(const Registry& registry)
{
CHECK(elected());
// TODO(bmahler): Add a 'Percentage' abstraction.
Try<double> limit_ = numify<double>(
strings::remove(
flags.recovery_slave_removal_limit,
"%",
strings::SUFFIX));
CHECK_SOME(limit_);
double limit = limit_.get() / 100.0;
// Compute the percentage of slaves to be removed, if it exceeds the
// safety-net limit, bail!
double removalPercentage =
(1.0 * slaves.recovered.size()) /
(1.0 * registry.slaves().slaves().size());
if (removalPercentage > limit) {
EXIT(1) << "Post-recovery slave removal limit exceeded! After "
<< SLAVE_PING_TIMEOUT * MAX_SLAVE_PING_TIMEOUTS
<< " there were " << slaves.recovered.size()
<< " (" << removalPercentage * 100 << "%) slaves recovered from the"
<< " registry that did not re-register: \n"
<< stringify(slaves.recovered) << "\n "
<< " The configured removal limit is " << limit * 100 << "%. Please"
<< " investigate or increase this limit to proceed further";
}
foreach (const Registry::Slave& slave, registry.slaves().slaves()) {
if (!slaves.recovered.contains(slave.info().id())) {
continue; // Slave re-registered.
}
LOG(WARNING) << "Slave " << slave.info().id()
<< " (" << slave.info().hostname() << ") did not re-register "
<< "within the timeout; removing it from the registrar";
++metrics.recovery_slave_removals;
slaves.recovered.erase(slave.info().id());
if (flags.registry_strict) {
slaves.removing.insert(slave.info().id());
registrar->apply(Owned<Operation>(new RemoveSlave(slave.info())))
.onAny(defer(self(),
&Self::_removeSlave,
slave.info(),
vector<StatusUpdate>(), // No TASK_LOST updates to send.
lambda::_1));
} else {
// When a non-strict registry is in use, we want to ensure the
// registry is used in a write-only manner. Therefore we remove
// the slave from the registry but we do not inform the
// framework.
const string& message =
"Failed to remove slave " + stringify(slave.info().id());
registrar->apply(Owned<Operation>(new RemoveSlave(slave.info())))
.onFailed(lambda::bind(fail, message, lambda::_1));
}
}
}
void Master::fileAttached(const Future<Nothing>& result, const string& path)
{
if (result.isReady()) {
LOG(INFO) << "Successfully attached file '" << path << "'";
} else {
LOG(ERROR) << "Failed to attach file '" << path << "': "
<< (result.isFailed() ? result.failure() : "discarded");
}
}
void Master::submitScheduler(const string& name)
{
LOG(INFO) << "Scheduler submit request for " << name;
SubmitSchedulerResponse response;
response.set_okay(false);
reply(response);
}
void Master::contended(const Future<Future<Nothing> >& candidacy)
{
CHECK(!candidacy.isDiscarded());
if (candidacy.isFailed()) {
EXIT(1) << "Failed to contend: " << candidacy.failure();
}
// Watch for candidacy change.
candidacy.get()
.onAny(defer(self(), &Master::lostCandidacy, lambda::_1));
}
void Master::lostCandidacy(const Future<Nothing>& lost)
{
CHECK(!lost.isDiscarded());
if (lost.isFailed()) {
EXIT(1) << "Failed to watch for candidacy: " << lost.failure();
}
if (elected()) {
EXIT(1) << "Lost leadership... committing suicide!";
}
LOG(INFO) << "Lost candidacy as a follower... Contend again";
contender->contend()
.onAny(defer(self(), &Master::contended, lambda::_1));
}
void Master::detected(const Future<Option<MasterInfo> >& _leader)
{
CHECK(!_leader.isDiscarded());
if (_leader.isFailed()) {
EXIT(1) << "Failed to detect the leading master: " << _leader.failure()
<< "; committing suicide!";
}
bool wasElected = elected();
leader = _leader.get();
LOG(INFO) << "The newly elected leader is "
<< (leader.isSome()
? (leader.get().pid() + " with id " + leader.get().id())
: "None");
if (wasElected && !elected()) {
EXIT(1) << "Lost leadership... committing suicide!";
}
if (elected()) {
electedTime = Clock::now();
if (!wasElected) {
LOG(INFO) << "Elected as the leading master!";
// Begin the recovery process, bail if it fails or is discarded.
recover()
.onFailed(lambda::bind(fail, "Recovery failed", lambda::_1))
.onDiscarded(lambda::bind(fail, "Recovery failed", "discarded"));
} else {
// This happens if there is a ZK blip that causes a re-election
// but the same leading master is elected as leader.
LOG(INFO) << "Re-elected as the leading master";
}
}
// Keep detecting.
detector->detect(leader)
.onAny(defer(self(), &Master::detected, lambda::_1));
}
// Helper to convert authorization result to Future<Option<Error> >.
static Future<Option<Error> > _authorize(const string& message, bool authorized)
{
if (authorized) {
return None();
}
return Error(message);
}
Future<Option<Error> > Master::validate(
const FrameworkInfo& frameworkInfo,
const UPID& from)
{
if (flags.authenticate_frameworks) {
if (!authenticated.contains(from)) {
// This could happen if another authentication request came
// through before we are here or if a framework tried to
// (re-)register without authentication.
return Error("Framework at " + stringify(from) + " is not authenticated");
} else if (frameworkInfo.has_principal() &&
frameworkInfo.principal() != authenticated[from]) {
return Error(
"Framework principal '" + frameworkInfo.principal() +
"' does not match authenticated principal '" + authenticated[from] +
"'");
} else if (!frameworkInfo.has_principal()) {
// We allow an authenticated framework to not specify a
// principal in FrameworkInfo but we'd prefer if it did so we log
// a WARNING here when this happens.
LOG(WARNING) << "Framework at " << from << " (authenticated as '"
<< authenticated[from]
<< "') does not specify principal in its FrameworkInfo";
}
}
// TODO(vinod): Deprecate this in favor of ACLs.
if (!roles.contains(frameworkInfo.role())) {
return Error("Role '" + frameworkInfo.role() + "' is invalid");
}
if (authorizer.isNone()) {
// Authorization is disabled.
return None();
}
LOG(INFO)
<< "Authorizing framework principal '" << frameworkInfo.principal()
<< "' to receive offers for role '" << frameworkInfo.role() << "'";
mesos::ACL::RegisterFramework request;
if (frameworkInfo.has_principal()) {
request.mutable_principals()->add_values(frameworkInfo.principal());
} else {
// Framework doesn't have a principal set.
request.mutable_principals()->set_type(mesos::ACL::Entity::ANY);
}
request.mutable_roles()->add_values(frameworkInfo.role());
return authorizer.get()->authorize(request).then(
lambda::bind(&_authorize,
"Not authorized to use role '" + frameworkInfo.role() + "'",
lambda::_1));
}
void Master::registerFramework(
const UPID& from,
const FrameworkInfo& frameworkInfo)
{
++metrics.messages_register_framework;
if (authenticating.contains(from)) {
// TODO(vinod): Consider dropping this request and fix the tests
// to deal with the drop. Currently there is a race between master
// realizing the framework is authenticated and framework sending
// a registration request. Dropping this message will cause the
// framework to retry slowing down the tests.
LOG(INFO) << "Queuing up registration request from " << from
<< " because authentication is still in progress";
authenticating[from]
.onReady(defer(self(), &Self::registerFramework, from, frameworkInfo));
return;
}
LOG(INFO) << "Received registration request from " << from;
validate(frameworkInfo, from)
.onAny(defer(self(),
&Master::_registerFramework,
from,
frameworkInfo,
lambda::_1));
}
void Master::_registerFramework(
const UPID& from,
const FrameworkInfo& frameworkInfo,
const Future<Option<Error> >& validationError)
{
CHECK_READY(validationError);
if (validationError.get().isSome()) {
LOG(INFO) << "Refusing registration of framework at " << from << ": "
<< validationError.get().get().message;
FrameworkErrorMessage message;
message.set_message(validationError.get().get().message);
send(from, message);
return;
}
if (authenticating.contains(from)) {
// This could happen if a new authentication request came from the
// same framework while validation was in progress.
LOG(INFO) << "Dropping registration request from " << from
<< " because new authentication attempt is in progress";
return;
}
if (flags.authenticate_frameworks && !authenticated.contains(from)) {
// This could happen if another (failed over) framework
// authenticated while validation was in progress.
LOG(INFO) << "Dropping registration request from " << from
<< " because it is not authenticated";
return;
}
// Check if this framework is already registered (because it retries).
foreachvalue (Framework* framework, frameworks.registered) {
if (framework->pid == from) {
LOG(INFO) << "Framework " << framework->id << " (" << framework->pid
<< ") already registered, resending acknowledgement";
FrameworkRegisteredMessage message;
message.mutable_framework_id()->MergeFrom(framework->id);
message.mutable_master_info()->MergeFrom(info_);
send(from, message);
return;
}
}
Framework* framework =
new Framework(frameworkInfo, newFrameworkId(), from, Clock::now());
LOG(INFO) << "Registering framework " << framework->id << " at " << from;
// TODO(vinod): Deprecate this in favor of authorization.
bool rootSubmissions = flags.root_submissions;
if (framework->info.user() == "root" && rootSubmissions == false) {
LOG(INFO) << framework << " registering as root, but "
<< "root submissions are disabled on this cluster";
FrameworkErrorMessage message;
message.set_message("User 'root' is not allowed to run frameworks");
send(from, message);
delete framework;
return;
}
addFramework(framework);
}
void Master::reregisterFramework(
const UPID& from,
const FrameworkInfo& frameworkInfo,
bool failover)
{
++metrics.messages_reregister_framework;
if (authenticating.contains(from)) {
LOG(INFO) << "Queuing up re-registration request from " << from
<< " because authentication is still in progress";
// TODO(vinod): Consider dropping this request and fix the tests
// to deal with the drop. See 'Master::registerFramework()' for
// more details.
authenticating[from]
.onReady(defer(self(),
&Self::reregisterFramework,
from,
frameworkInfo,
failover));
return;
}
if (!frameworkInfo.has_id() || frameworkInfo.id() == "") {
LOG(ERROR) << "Framework re-registering without an id!";
FrameworkErrorMessage message;
message.set_message("Framework reregistered without a framework id");
send(from, message);
return;
}
LOG(INFO) << "Received re-registration request from framework "
<< frameworkInfo.id() << " at " << from;
validate(frameworkInfo, from)
.onAny(defer(self(),
&Master::_reregisterFramework,
from,
frameworkInfo,
failover,
lambda::_1));
}
void Master::_reregisterFramework(
const UPID& from,
const FrameworkInfo& frameworkInfo,
bool failover,
const Future<Option<Error> >& validationError)
{
CHECK_READY(validationError);
if (validationError.get().isSome()) {
LOG(INFO) << "Refusing re-registration of framework " << frameworkInfo.id()
<< " at " << from << ": " << validationError.get().get().message;
FrameworkErrorMessage message;
message.set_message(validationError.get().get().message);
send(from, message);
return;
}
if (authenticating.contains(from)) {
// This could happen if a new authentication request came from the
// same framework while validation was in progress.
LOG(INFO) << "Dropping re-registration request of framework "
<< frameworkInfo.id() << " at " << from
<< " because new authentication attempt is in progress";
return;
}
if (flags.authenticate_frameworks && !authenticated.contains(from)) {
// This could happen if another (failed over) framework
// authenticated while validation was in progress. It is important
// to drop this because if this request is from a failing over
// framework (pid = from) we don't want to failover the already
// registered framework (pid = framework->pid).
LOG(INFO) << "Dropping re-registration request of framework "
<< frameworkInfo.id() << " at " << from
<< " because it is not authenticated";
return;
}
LOG(INFO) << "Re-registering framework " << frameworkInfo.id()
<< " at " << from;
if (frameworks.registered.count(frameworkInfo.id()) > 0) {
// Using the "failover" of the scheduler allows us to keep a
// scheduler that got partitioned but didn't die (in ZooKeeper
// speak this means didn't lose their session) and then
// eventually tried to connect to this master even though
// another instance of their scheduler has reconnected. This
// might not be an issue in the future when the
// master/allocator launches the scheduler can get restarted
// (if necessary) by the master and the master will always
// know which scheduler is the correct one.
Framework* framework = frameworks.registered[frameworkInfo.id()];
framework->reregisteredTime = Clock::now();
if (failover) {
// We do not attempt to detect a duplicate re-registration
// message here because it is impossible to distinguish between
// a duplicate message, and a scheduler failover to the same
// pid, given the existing libprocess primitives (PID does not
// identify the libprocess Process instance).
// TODO(benh): Should we check whether the new scheduler has
// given us a different framework name, user name or executor
// info?
LOG(INFO) << "Framework " << frameworkInfo.id() << " failed over";
failoverFramework(framework, from);
} else if (from != framework->pid) {
LOG(ERROR)
<< "Framework " << frameworkInfo.id() << " at " << from
<< " attempted to re-register while a framework at " << framework->pid
<< " is already registered";
FrameworkErrorMessage message;
message.set_message("Framework failed over");
send(from, message);
return;
} else {
LOG(INFO) << "Allowing the Framework " << frameworkInfo.id()
<< " to re-register with an already used id";
// Make sure we can get offers again.
// The framework might have been deactivated if it was doing
// authentication.
// TODO(vinod): Consider adding 'Master::activate(Framework*)'.
// TODO(vinod): Do this after we recover resources below.
if (!framework->active) {
framework->active = true;
allocator->frameworkActivated(framework->id, framework->info);
}
// Remove any offers sent to this framework.
// NOTE: We need to do this because the scheduler might have
// replied to the offers but the driver might have dropped
// those messages since it wasn't connected to the master.
foreach (Offer* offer, utils::copy(framework->offers)) {
allocator->resourcesRecovered(
offer->framework_id(),
offer->slave_id(),
offer->resources(),
None());
removeOffer(offer, true); // Rescind.
}
FrameworkReregisteredMessage message;
message.mutable_framework_id()->MergeFrom(frameworkInfo.id());
message.mutable_master_info()->MergeFrom(info_);
send(from, message);
return;
}
} else {
// We don't have a framework with this ID, so we must be a newly
// elected Mesos master to which either an existing scheduler or a
// failed-over one is connecting. Create a Framework object and add
// any tasks it has that have been reported by reconnecting slaves.
Framework* framework =
new Framework(frameworkInfo, frameworkInfo.id(), from, Clock::now());
framework->reregisteredTime = Clock::now();
// TODO(benh): Check for root submissions like above!
// Add any running tasks reported by slaves for this framework.
foreachvalue (Slave* slave, slaves.registered) {
foreachkey (const FrameworkID& frameworkId, slave->tasks) {
foreachvalue (Task* task, slave->tasks[frameworkId]) {
if (framework->id == task->framework_id()) {
framework->addTask(task);
// Also add the task's executor for resource accounting
// if it's still alive on the slave and we've not yet
// added it to the framework.
if (task->has_executor_id() &&
slave->hasExecutor(framework->id, task->executor_id()) &&
!framework->hasExecutor(slave->id, task->executor_id())) {
const ExecutorInfo& executorInfo =
slave->executors[framework->id][task->executor_id()];
framework->addExecutor(slave->id, executorInfo);
}
}
}
}
}
// N.B. Need to add the framework _after_ we add its tasks
// (above) so that we can properly determine the resources it's
// currently using!
addFramework(framework);
}
CHECK(frameworks.registered.contains(frameworkInfo.id()))
<< "Unknown framework " << frameworkInfo.id();
// Broadcast the new framework pid to all the slaves. We have to
// broadcast because an executor might be running on a slave but
// it currently isn't running any tasks. This could be a
// potential scalability issue ...
foreachvalue (Slave* slave, slaves.registered) {
UpdateFrameworkMessage message;
message.mutable_framework_id()->MergeFrom(frameworkInfo.id());
message.set_pid(from);
send(slave->pid, message);
}
return;
}
void Master::unregisterFramework(
const UPID& from,
const FrameworkID& frameworkId)
{
++metrics.messages_unregister_framework;
LOG(INFO) << "Asked to unregister framework " << frameworkId;
Framework* framework = getFramework(frameworkId);
if (framework != NULL) {
if (framework->pid == from) {
removeFramework(framework);
} else {
LOG(WARNING)
<< "Ignoring unregister framework message for framework " << frameworkId
<< " from " << from << " because it is not from the registered"
<< " framework " << framework->pid;
}
}
}
void Master::deactivateFramework(
const UPID& from,
const FrameworkID& frameworkId)
{
++metrics.messages_deactivate_framework;
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING)
<< "Ignoring deactivate framework message for framework " << frameworkId
<< " because the framework cannot be found";
return;
}
if (from != framework->pid) {
LOG(WARNING)
<< "Ignoring deactivate framework message for framework " << frameworkId
<< " from '" << from << "' because it is not from the registered"
<< " framework '" << framework->pid << "'";
return;
}
deactivate(framework);
}
void Master::deactivate(Framework* framework)
{
CHECK_NOTNULL(framework);
LOG(INFO) << "Deactivating framework " << framework->id;
// Stop sending offers here for now.
framework->active = false;
// Tell the allocator to stop allocating resources to this framework.
allocator->frameworkDeactivated(framework->id);
// Remove the framework from authenticated. This is safe because
// a framework will always reauthenticate before (re-)registering.
authenticated.erase(framework->pid);
// Remove the framework's offers.
foreach (Offer* offer, utils::copy(framework->offers)) {
allocator->resourcesRecovered(
offer->framework_id(), offer->slave_id(), offer->resources(), None());
removeOffer(offer, true); // Rescind.
}
}
void Master::disconnect(Slave* slave)
{
CHECK_NOTNULL(slave);
LOG(INFO) << "Disconnecting slave " << slave->id;
// Mark the slave as disconnected and deactivate it in the allocator.
slave->disconnected = true;
allocator->slaveDeactivated(slave->id);
// Remove the slave from authenticated. This is safe because
// a slave will always reauthenticate before (re-)registering.
authenticated.erase(slave->pid);
// Remove and rescind offers.
foreach (Offer* offer, utils::copy(slave->offers)) {
allocator->resourcesRecovered(
offer->framework_id(), slave->id, offer->resources(), None());
removeOffer(offer, true); // Rescind!
}
}
void Master::resourceRequest(
const UPID& from,
const FrameworkID& frameworkId,
const vector<Request>& requests)
{
++metrics.messages_resource_request;
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING)
<< "Ignoring resource request message from framework " << frameworkId
<< " because the framework cannot be found";
return;
}
if (from != framework->pid) {
LOG(WARNING)
<< "Ignoring resource request message from framework " << frameworkId
<< " from '" << from << "' because it is not from the registered "
<< " framework '" << framework->pid << "'";
return;
}
LOG(INFO) << "Requesting resources for framework " << frameworkId;
allocator->resourcesRequested(frameworkId, requests);
}
// We use the visitor pattern to abstract the process of performing
// any validations, aggregations, etc. of tasks that a framework
// attempts to run within the resources provided by offers. A
// visitor can return an optional error (typedef'ed as an option of a
// string) which will cause the master to send a failed status update
// back to the framework for only that task description. An instance
// will be reused for each task description from same 'launchTasks()',
// but not for task descriptions from different offers.
struct TaskInfoVisitor
{
virtual Option<Error> operator () (
const TaskInfo& task,
const Resources& resources,
const Framework& framework,
const Slave& slave) = 0;
virtual ~TaskInfoVisitor() {}
};
// Checks that a task id is valid, i.e., contains only valid characters.
struct TaskIDChecker : TaskInfoVisitor
{
virtual Option<Error> operator () (
const TaskInfo& task,
const Resources& resources,
const Framework& framework,
const Slave& slave)
{
const string& id = task.task_id().value();
if (std::count_if(id.begin(), id.end(), invalid) > 0) {
return "TaskID '" + id + "' contains invalid characters";
}
return None();
}
static bool invalid(char c)
{
return iscntrl(c) || c == '/' || c == '\\';
}
};
// Checks that the slave ID used by a task is correct.
struct SlaveIDChecker : TaskInfoVisitor
{
virtual Option<Error> operator () (
const TaskInfo& task,
const Resources& resources,
const Framework& framework,
const Slave& slave)
{
if (!(task.slave_id() == slave.id)) {
return "Task uses invalid slave " + task.slave_id().value() +
" while slave " + slave.id.value() + " is expected";
}
return None();
}
};
// Checks that each task uses a unique ID. Regardless of whether a
// task actually gets launched (for example, another checker may
// return an error for a task), we always consider it an error when a
// task tries to re-use an ID.
struct UniqueTaskIDChecker : TaskInfoVisitor
{
virtual Option<Error> operator () (
const TaskInfo& task,
const Resources& resources,
const Framework& framework,
const Slave& slave)
{
const TaskID& taskId = task.task_id();
if (framework.pendingTasks.contains(taskId) ||
framework.tasks.contains(taskId)) {
return "Task has duplicate ID: " + taskId.value();
}
return None();
}
};
// Checks that the used resources by a task on each slave does not
// exceed the total resources offered on that slave.
// NOTE: We do not account for executor resources here because tasks
// are launched asynchronously and an executor might exit between
// validation and actual launch. Therefore executor resources are
// accounted for in 'Master::_launchTasks()'.
struct ResourceUsageChecker : TaskInfoVisitor
{
virtual Option<Error> operator () (
const TaskInfo& task,
const Resources& resources,
const Framework& framework,
const Slave& slave)
{
if (task.resources().size() == 0) {
return stringify("Task uses no resources");
}
foreach (const Resource& resource, task.resources()) {
if (!Resources::isAllocatable(resource)) {
return "Task uses invalid resources: " + stringify(resource);
}
}
// Check if this task uses more resources than offered.
Resources taskResources = task.resources();
if (!(taskResources <= resources)) {
return "Task " + stringify(task.task_id()) + " attempted to use " +
stringify(taskResources) + " which is greater than offered " +
stringify(resources);
}
// Check this task's executor's resources.
if (task.has_executor()) {
// TODO(benh): Check that the executor uses some resources.
foreach (const Resource& resource, task.executor().resources()) {
if (!Resources::isAllocatable(resource)) {
// TODO(benh): Send back the invalid resources?
return "Executor for task " + stringify(task.task_id()) +
" uses invalid resources " + stringify(resource);
}
}
}
return None();
}
};
// Checks that tasks that use the "same" executor (i.e., same
// ExecutorID) have an identical ExecutorInfo.
struct ExecutorInfoChecker : TaskInfoVisitor
{
virtual Option<Error> operator () (
const TaskInfo& task,
const Resources& resources,
const Framework& framework,
const Slave& slave)
{
if (task.has_executor() == task.has_command()) {
return stringify(
"Task should have at least one (but not both) of CommandInfo or"
" ExecutorInfo present");
}
if (task.has_executor()) {
const ExecutorID& executorId = task.executor().executor_id();
Option<ExecutorInfo> executorInfo = None();
if (slave.hasExecutor(framework.id, executorId)) {
executorInfo = slave.executors.get(framework.id).get().get(executorId);
} else {
// See if any of the pending tasks have the same executor
// on the same slave.
// Note that picking the first matching executor is ok because
// all the matching executors have been added to
// 'framework.pendingTasks' after validation and hence have
// the same executor info.
foreachvalue (const TaskInfo& task_, framework.pendingTasks) {
if (task_.has_executor() &&
task_.executor().executor_id() == executorId &&
task_.slave_id() == task.slave_id()) {
executorInfo = task_.executor();
break;
}
}
}
if (executorInfo.isSome() && !(task.executor() == executorInfo.get())) {
return "Task has invalid ExecutorInfo (existing ExecutorInfo"
" with same ExecutorID is not compatible).\n"
"------------------------------------------------------------\n"
"Existing ExecutorInfo:\n" +
stringify(executorInfo.get()) + "\n"
"------------------------------------------------------------\n"
"Task's ExecutorInfo:\n" +
stringify(task.executor()) + "\n"
"------------------------------------------------------------\n";
}
}
return None();
}
};
// Checks that a task that asks for checkpointing is not being
// launched on a slave that has not enabled checkpointing.
struct CheckpointChecker : TaskInfoVisitor
{
virtual Option<Error> operator () (
const TaskInfo& task,
const Resources& resources,
const Framework& framework,
const Slave& slave)
{
if (framework.info.checkpoint() && !slave.info.checkpoint()) {
return "Task asked to be checkpointed but slave " +
stringify(slave.id) + " has checkpointing disabled";
}
return None();
}
};
// OfferVisitors are similar to the TaskInfoVisitor pattern and
// are used for validation and aggregation of offers.
// The error reporting scheme is also similar to TaskInfoVisitor.
// However, offer processing (and subsequent task processing) is
// aborted altogether if offer visitor reports an error.
struct OfferVisitor
{
virtual Option<Error> operator () (
const OfferID& offerId,
const Framework& framework,
Master* master) = 0;
virtual ~OfferVisitor() {}
Slave* getSlave(Master* master, const SlaveID& slaveId)
{
CHECK_NOTNULL(master);
return master->getSlave(slaveId);
}
Offer* getOffer(Master* master, const OfferID& offerId)
{
CHECK_NOTNULL(master);
return master->getOffer(offerId);
}
};
// Checks validity/liveness of an offer.
struct ValidOfferChecker : OfferVisitor {
virtual Option<Error> operator () (
const OfferID& offerId,
const Framework& framework,
Master* master)
{
Offer* offer = getOffer(master, offerId);
if (offer == NULL) {
return Error("Offer " + stringify(offerId) + " is no longer valid");
}
return None();
}
};
// Checks that an offer belongs to the expected framework.
struct FrameworkChecker : OfferVisitor {
virtual Option<Error> operator () (
const OfferID& offerId,
const Framework& framework,
Master* master)
{
Offer* offer = getOffer(master, offerId);
if (offer == NULL) {
return Error("Offer " + stringify(offerId) + " is no longer valid");
}
if (!(framework.id == offer->framework_id())) {
return "Offer " + stringify(offer->id()) +
" has invalid framework " + stringify(offer->framework_id()) +
" while framework " + stringify(framework.id) + " is expected";
}
return None();
}
};
// Checks that the slave is valid and ensures that all offers belong to
// the same slave.
struct SlaveChecker : OfferVisitor
{
virtual Option<Error> operator () (
const OfferID& offerId,
const Framework& framework,
Master* master)
{
Offer* offer = getOffer(master, offerId);
if (offer == NULL) {
return "Offer " + stringify(offerId) + " is no longer valid";
}
Slave* slave = getSlave(master, offer->slave_id());
// This is not possible because the offer should've been removed.
CHECK(slave != NULL)
<< "Offer " << offerId
<< " outlived slave " << offer->slave_id();
// This is not possible because the offer should've been removed.
CHECK(!slave->disconnected)
<< "Offer " << offerId
<< " outlived disconnected slave " << offer->slave_id();
if (slaveId.isNone()) {
// Set slave id and use as base case for validation.
slaveId = slave->id;
} else if (!(slave->id == slaveId.get())) {
return "Aggregated offers must belong to one single slave. Offer " +
stringify(offerId) + " uses slave " +
stringify(slave->id) + " and slave " +
stringify(slaveId.get());
}
return None();
}
Option<const SlaveID> slaveId;
};
// Checks that an offer only appears once in offer list.
struct UniqueOfferIDChecker : OfferVisitor
{
virtual Option<Error> operator () (
const OfferID& offerId,
const Framework& framework,
Master* master)
{
if (offers.contains(offerId)) {
return "Duplicate offer " + stringify(offerId) + " in offer list";
}
offers.insert(offerId);
return None();
}
hashset<OfferID> offers;
};
void Master::launchTasks(
const UPID& from,
const FrameworkID& frameworkId,
const vector<TaskInfo>& tasks,
const Filters& filters,
const vector<OfferID>& offerIds)
{
++metrics.messages_launch_tasks;
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING)
<< "Ignoring launch tasks message for offers " << stringify(offerIds)
<< " of framework " << frameworkId
<< " because the framework cannot be found";
return;
}
if (from != framework->pid) {
LOG(WARNING)
<< "Ignoring launch tasks message for offers " << stringify(offerIds)
<< " of framework " << frameworkId << " from '" << from
<< "' because it is not from the registered framework '"
<< framework->pid << "'";
return;
}
// TODO(bmahler): We currently only support using multiple offers
// for a single slave.
Resources used;
Option<SlaveID> slaveId = None();
Option<Error> error = None();
if (offerIds.empty()) {
error = Error("No offers specified");
} else {
list<Owned<OfferVisitor> > offerVisitors;
offerVisitors.push_back(Owned<OfferVisitor>(new ValidOfferChecker()));
offerVisitors.push_back(Owned<OfferVisitor>(new FrameworkChecker()));
offerVisitors.push_back(Owned<OfferVisitor>(new SlaveChecker()));
offerVisitors.push_back(Owned<OfferVisitor>(new UniqueOfferIDChecker()));
// Validate the offers.
foreach (const OfferID& offerId, offerIds) {
foreach (const Owned<OfferVisitor>& visitor, offerVisitors) {
if (error.isNone()) {
error = (*visitor)(offerId, *framework, this);
}
}
}
// Compute used resources and remove the offers. If the
// validation failed, return resources to the allocator.
foreach (const OfferID& offerId, offerIds) {
Offer* offer = getOffer(offerId);
if (offer != NULL) {
slaveId = offer->slave_id();
used += offer->resources();
if (error.isSome()) {
allocator->resourcesRecovered(
offer->framework_id(),
offer->slave_id(),
offer->resources(),
None());
}
removeOffer(offer);
}
}
}
// If invalid, send TASK_LOST for the launch attempts.
if (error.isSome()) {
LOG(WARNING) << "Launch tasks message used invalid offers '"
<< stringify(offerIds) << "': " << error.get().message;
foreach (const TaskInfo& task, tasks) {
const StatusUpdate& update = protobuf::createStatusUpdate(
framework->id,
task.slave_id(),
task.task_id(),
TASK_LOST,
"Task launched with invalid offers: " + error.get().message);
metrics.tasks_lost++;
stats.tasks[TASK_LOST]++;
forward(update, UPID(), framework);
}
return;
}
CHECK_SOME(slaveId);
Slave* slave = CHECK_NOTNULL(getSlave(slaveId.get()));
LOG(INFO) << "Processing reply for offers: "
<< stringify(offerIds)
<< " on slave " << *slave
<< " for framework " << framework->id;
// Validate each task and launch if valid.
list<Future<Option<Error> > > futures;
foreach (const TaskInfo& task, tasks) {
futures.push_back(validateTask(task, framework, slave, used));
// Add to pending tasks.
// NOTE: We need to do this here after validation because of the
// way task validators work.
framework->pendingTasks[task.task_id()] = task;
stats.tasks[TASK_STAGING]++;
}
// Wait for all the tasks to be validated.
// NOTE: We wait for all tasks because currently the allocator
// is expected to get 'resourcesRecovered()' once per 'launchTasks()'.
await(futures)
.onAny(defer(self(),
&Master::_launchTasks,
framework->id,
slaveId.get(),
tasks,
used,
filters,
lambda::_1));
}
Future<Option<Error> > Master::validateTask(
const TaskInfo& task,
Framework* framework,
Slave* slave,
const Resources& totalResources)
{
CHECK_NOTNULL(framework);
CHECK_NOTNULL(slave);
// Create task visitors.
// TODO(vinod): Create the visitors on the stack and make the visit
// operation const.
list<Owned<TaskInfoVisitor> > taskVisitors;
taskVisitors.push_back(Owned<TaskInfoVisitor>(new TaskIDChecker()));
taskVisitors.push_back(Owned<TaskInfoVisitor>(new SlaveIDChecker()));
taskVisitors.push_back(Owned<TaskInfoVisitor>(new UniqueTaskIDChecker()));
taskVisitors.push_back(Owned<TaskInfoVisitor>(new ResourceUsageChecker()));
taskVisitors.push_back(Owned<TaskInfoVisitor>(new ExecutorInfoChecker()));
taskVisitors.push_back(Owned<TaskInfoVisitor>(new CheckpointChecker()));
// TODO(benh): Add a HealthCheckChecker visitor.
// TODO(jieyu): Add a CommandInfoCheck visitor.
// Invoke each visitor.
Option<Error> error = None();
foreach (const Owned<TaskInfoVisitor>& visitor, taskVisitors) {
error = (*visitor)(task, totalResources, *framework, *slave);
if (error.isSome()) {
break;
}
}
if (error.isSome()) {
return Error(error.get().message);
}
if (authorizer.isNone()) {
// Authorization is disabled.
return None();
}
// Authorize the task.
string user = framework->info.user(); // Default user.
if (task.has_command() && task.command().has_user()) {
user = task.command().user();
} else if (task.has_executor() && task.executor().command().has_user()) {
user = task.executor().command().user();
}
LOG(INFO)
<< "Authorizing framework principal '" << framework->info.principal()
<< "' to launch task " << task.task_id() << " as user '" << user << "'";
mesos::ACL::RunTask request;
if (framework->info.has_principal()) {
request.mutable_principals()->add_values(framework->info.principal());
} else {
// Framework doesn't have a principal set.
request.mutable_principals()->set_type(mesos::ACL::Entity::ANY);
}
request.mutable_users()->add_values(user);
return authorizer.get()->authorize(request).then(
lambda::bind(&_authorize,
"Not authorized to launch as user '" + user + "'",
lambda::_1));
}
void Master::launchTask(
const TaskInfo& task,
Framework* framework,
Slave* slave)
{
CHECK_NOTNULL(framework);
CHECK_NOTNULL(slave);
CHECK(!slave->disconnected);
// Determine if this task launches an executor, and if so make sure
// the slave and framework state has been updated accordingly.
Option<ExecutorID> executorId;
if (task.has_executor()) {
// TODO(benh): Refactor this code into Slave::addTask.
if (!slave->hasExecutor(framework->id, task.executor().executor_id())) {
CHECK(!framework->hasExecutor(slave->id, task.executor().executor_id()))
<< "Executor " << task.executor().executor_id()
<< " known to the framework " << framework->id
<< " but unknown to the slave " << *slave;
slave->addExecutor(framework->id, task.executor());
framework->addExecutor(slave->id, task.executor());
}
executorId = task.executor().executor_id();
}
// Add the task to the framework and slave.
Task* t = new Task();
t->mutable_framework_id()->MergeFrom(framework->id);
t->set_state(TASK_STAGING);
t->set_name(task.name());
t->mutable_task_id()->MergeFrom(task.task_id());
t->mutable_slave_id()->MergeFrom(task.slave_id());
t->mutable_resources()->MergeFrom(task.resources());
if (executorId.isSome()) {
t->mutable_executor_id()->MergeFrom(executorId.get());
}
framework->addTask(t);
slave->addTask(t);
// Tell the slave to launch the task!
LOG(INFO) << "Launching task " << task.task_id()
<< " of framework " << framework->id
<< " with resources " << task.resources()
<< " on slave " << *slave;
RunTaskMessage message;
message.mutable_framework()->MergeFrom(framework->info);
message.mutable_framework_id()->MergeFrom(framework->id);
message.set_pid(framework->pid);
message.mutable_task()->MergeFrom(task);
send(slave->pid, message);
return;
}
void Master::_launchTasks(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
const vector<TaskInfo>& tasks,
const Resources& totalResources,
const Filters& filters,
const Future<list<Future<Option<Error> > > >& validationErrors)
{
CHECK_READY(validationErrors);
CHECK_EQ(validationErrors.get().size(), tasks.size());
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING)
<< "Ignoring launch tasks message for framework " << frameworkId
<< " because the framework cannot be found";
// Tell the allocator about the recovered resources.
allocator->resourcesRecovered(frameworkId, slaveId, totalResources, None());
return;
}
Slave* slave = getSlave(slaveId);
if (slave == NULL || slave->disconnected) {
foreach (const TaskInfo& task, tasks) {
const StatusUpdate& update = protobuf::createStatusUpdate(
framework->id,
task.slave_id(),
task.task_id(),
TASK_LOST,
(slave == NULL ? "Slave removed" : "Slave disconnected"));
metrics.tasks_lost++;
stats.tasks[TASK_LOST]++;
forward(update, UPID(), framework);
}
// Tell the allocator about the recovered resources.
allocator->resourcesRecovered(frameworkId, slaveId, totalResources, None());
return;
}
Resources usedResources; // Accumulated resources used.
size_t index = 0;
foreach (const Future<Option<Error> >& future, validationErrors.get()) {
const TaskInfo& task = tasks[index++];
// NOTE: The task will not be in 'pendingTasks' if 'killTask()'
// for the task was called before we are here.
if (!framework->pendingTasks.contains(task.task_id())) {
continue;
}
framework->pendingTasks.erase(task.task_id()); // Remove from pending tasks.
CHECK(!future.isDiscarded());
if (future.isFailed() || future.get().isSome()) {
const string error = future.isFailed()
? "Authorization failure: " + future.failure()
: future.get().get().message;
const StatusUpdate& update = protobuf::createStatusUpdate(
framework->id,
task.slave_id(),
task.task_id(),
TASK_LOST,
error);
metrics.tasks_lost++;
stats.tasks[TASK_LOST]++;
forward(update, UPID(), framework);
continue;
}
// Check if resources needed by the task (and its executor in case
// the executor is new) are available. These resources will be
// added by 'launchTask()' below.
Resources resources = task.resources();
if (task.has_executor() &&
!slave->hasExecutor(framework->id, task.executor().executor_id())) {
resources += task.executor().resources();
}
if (!(usedResources + resources <= totalResources)) {
const string error =
"Task uses more resources " + stringify(resources) +
" than available " + stringify(totalResources - usedResources);
const StatusUpdate& update = protobuf::createStatusUpdate(
framework->id,
task.slave_id(),
task.task_id(),
TASK_LOST,
error);
metrics.tasks_lost++;
stats.tasks[TASK_LOST]++;
forward(update, UPID(), framework);
continue;
}
// Launch task.
launchTask(task, framework, slave);
usedResources += resources;
}
// All used resources should be allocatable, enforced by our validators.
CHECK_EQ(usedResources, usedResources.allocatable());
// Calculate unused resources.
Resources unusedResources = totalResources - usedResources;
if (unusedResources.allocatable().size() > 0) {
// Tell the allocator about the unused (e.g., refused) resources.
allocator->resourcesRecovered(
frameworkId, slaveId, unusedResources, filters);
}
}
void Master::reviveOffers(const UPID& from, const FrameworkID& frameworkId)
{
++metrics.messages_revive_offers;
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING)
<< "Ignoring revive offers message for framework " << frameworkId
<< " because the framework cannot be found";
return;
}
if (from != framework->pid) {
LOG(WARNING)
<< "Ignoring revive offers message for framework " << frameworkId
<< " from '" << from << "' because it is not from the registered"
<< " framework '" << framework->pid << "'";
return;
}
LOG(INFO) << "Reviving offers for framework " << framework->id;
allocator->offersRevived(framework->id);
}
void Master::killTask(
const UPID& from,
const FrameworkID& frameworkId,
const TaskID& taskId)
{
++metrics.messages_kill_task;
LOG(INFO) << "Asked to kill task " << taskId
<< " of framework " << frameworkId;
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING)
<< "Ignoring kill task message for task " << taskId << " of framework "
<< frameworkId << " because the framework cannot be found";
return;
}
if (from != framework->pid) {
LOG(WARNING)
<< "Ignoring kill task message for task " << taskId
<< " of framework " << frameworkId << " from '" << from
<< "' because it is not from the registered framework '"
<< framework->pid << "'";
return;
}
if (framework->pendingTasks.contains(taskId)) {
// Remove from pending tasks.
framework->pendingTasks.erase(taskId);
const StatusUpdate& update = protobuf::createStatusUpdate(
frameworkId,
None(),
taskId,
TASK_KILLED,
"Killed pending task");
forward(update, UPID(), framework);
return;
}
Task* task = framework->getTask(taskId);
if (task == NULL) {
// TODO(bmahler): Per MESOS-1200, if we knew the SlaveID here we
// could reply more frequently in the presence of slaves in a
// transitionary state.
if (!slaves.recovered.empty()) {
LOG(WARNING)
<< "Cannot kill task " << taskId << " of framework " << frameworkId
<< " because the slave containing this task may not have re-registered"
<< " yet with this master";
} else if (!slaves.reregistering.empty()) {
LOG(WARNING)
<< "Cannot kill task " << taskId << " of framework " << frameworkId
<< " because the slave may be in the process of being re-admitted by"
<< " the registrar";
} else if (!slaves.removing.empty()) {
LOG(WARNING)
<< "Cannot kill task " << taskId << " of framework " << frameworkId
<< " because the slave may be in the process of being removed from the"
<< " registrar, it is likely TASK_LOST updates will occur when the"
<< " slave is removed";
} else if (flags.registry_strict) {
// For a strict registry, if there are no slaves transitioning
// between states, then this task is definitely unknown!
LOG(WARNING)
<< "Cannot kill task " << taskId << " of framework " << frameworkId
<< " because it cannot be found; sending TASK_LOST since there are"
<< " no transitionary slaves";
const StatusUpdate& update = protobuf::createStatusUpdate(
frameworkId,
None(),
taskId,
TASK_LOST,
"Attempted to kill an unknown task");
forward(update, UPID(), framework);
} else {
// For a non-strict registry, the slave holding this task could
// be readmitted even if we have no knowledge of it.
LOG(WARNING)
<< "Cannot kill task " << taskId << " of framework " << frameworkId
<< " because it cannot be found; cannot send TASK_LOST since a"
<< " non-strict registry is in use";
}
return;
}
Slave* slave = getSlave(task->slave_id());
CHECK(slave != NULL) << "Unknown slave " << task->slave_id();
// We add the task to 'killedTasks' here because the slave
// might be partitioned or disconnected but the master
// doesn't know it yet.
slave->killedTasks.put(frameworkId, taskId);
// NOTE: This task will be properly reconciled when the
// disconnected slave re-registers with the master.
if (!slave->disconnected) {
LOG(INFO) << "Telling slave " << *slave
<< " to kill task " << taskId
<< " of framework " << frameworkId;
KillTaskMessage message;
message.mutable_framework_id()->MergeFrom(frameworkId);
message.mutable_task_id()->MergeFrom(taskId);
send(slave->pid, message);
}
}
void Master::statusUpdateAcknowledgement(
const UPID& from,
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const TaskID& taskId,
const string& uuid)
{
metrics.messages_status_update_acknowledgement++;
// TODO(bmahler): Consider adding a message validator abstraction
// for the master that takes care of all this boilerplate. Ideally
// by the time we process messages in the critical master code, we
// can assume that they are valid. This will become especially
// important as validation logic is moved out of the scheduler
// driver and into the master.
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING)
<< "Ignoring status update acknowledgement message for task " << taskId
<< " of framework " << frameworkId << " on slave " << slaveId
<< " because the framework cannot be found";
metrics.invalid_status_update_acknowledgements++;
return;
}
if (from != framework->pid) {
LOG(WARNING)
<< "Ignoring status update acknowledgement message for task " << taskId
<< " of framework " << frameworkId << " on slave " << slaveId
<< " from " << from << " because it is not from the registered framework "
<< framework->pid;
metrics.invalid_status_update_acknowledgements++;
return;
}
Slave* slave = getSlave(slaveId);
if (slave == NULL) {
LOG(WARNING)
<< "Cannot send status update acknowledgement message for task " << taskId
<< " of framework " << frameworkId << " to slave " << slaveId
<< " because slave is not registered";
metrics.invalid_status_update_acknowledgements++;
return;
}
if (slave->disconnected) {
LOG(WARNING)
<< "Cannot send status update acknowledgement message for task " << taskId
<< " of framework " << frameworkId << " to slave " << *slave
<< " because slave is disconnected";
metrics.invalid_status_update_acknowledgements++;
return;
}
LOG(INFO) << "Forwarding status update acknowledgement "
<< UUID::fromBytes(uuid) << " for task " << taskId
<< " of framework " << frameworkId << " to slave " << *slave;
// TODO(bmahler): Once we store terminal unacknowledged updates in
// the master per MESOS-1410, this is where we'll find the
// unacknowledged task and remove it if present.
// Also, be sure to confirm Master::reconcile is still correct!
StatusUpdateAcknowledgementMessage message;
message.mutable_slave_id()->CopyFrom(slaveId);
message.mutable_framework_id()->CopyFrom(frameworkId);
message.mutable_task_id()->CopyFrom(taskId);
message.set_uuid(uuid);
send(slave->pid, message);
metrics.valid_status_update_acknowledgements++;
}
void Master::schedulerMessage(
const UPID& from,
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const string& data)
{
++metrics.messages_framework_to_executor;
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING)
<< "Ignoring framework message for executor " << executorId
<< " of framework " << frameworkId
<< " because the framework cannot be found";
stats.invalidFrameworkMessages++;
metrics.invalid_framework_to_executor_messages++;
return;
}
if (from != framework->pid) {
LOG(WARNING)
<< "Ignoring framework message for executor " << executorId
<< " of framework " << frameworkId << " from " << from
<< " because it is not from the registered framework "
<< framework->pid;
stats.invalidFrameworkMessages++;
metrics.invalid_framework_to_executor_messages++;
return;
}
Slave* slave = getSlave(slaveId);
if (slave == NULL) {
LOG(WARNING) << "Cannot send framework message for framework "
<< frameworkId << " to slave " << slaveId
<< " because slave is not registered";
stats.invalidFrameworkMessages++;
metrics.invalid_framework_to_executor_messages++;
return;
}
if (slave->disconnected) {
LOG(WARNING) << "Cannot send framework message for framework "
<< frameworkId << " to slave " << *slave
<< " because slave is disconnected";
stats.invalidFrameworkMessages++;
metrics.invalid_framework_to_executor_messages++;
return;
}
LOG(INFO) << "Sending framework message for framework "
<< frameworkId << " to slave " << *slave;
FrameworkToExecutorMessage message;
message.mutable_slave_id()->MergeFrom(slaveId);
message.mutable_framework_id()->MergeFrom(frameworkId);
message.mutable_executor_id()->MergeFrom(executorId);
message.set_data(data);
send(slave->pid, message);
stats.validFrameworkMessages++;
metrics.valid_framework_to_executor_messages++;
}
void Master::registerSlave(const UPID& from, const SlaveInfo& slaveInfo)
{
++metrics.messages_register_slave;
if (authenticating.contains(from)) {
LOG(INFO) << "Queuing up registration request from " << from
<< " because authentication is still in progress";
authenticating[from]
.onReady(defer(self(), &Self::registerSlave, from, slaveInfo));
return;
}
if (flags.authenticate_slaves && !authenticated.contains(from)) {
// This could happen if another authentication request came
// through before we are here or if a slave tried to register
// without authentication.
LOG(WARNING) << "Refusing registration of slave at " << from
<< " because it is not authenticated";
ShutdownMessage message;
message.set_message("Slave is not authenticated");
send(from, message);
return;
}
// Check if this slave is already registered (because it retries).
foreachvalue (Slave* slave, slaves.registered) {
if (slave->pid == from) {
if (slave->disconnected) {
// The slave was previously disconnected but it is now trying
// to register as a new slave. This could happen if the slave
// failed recovery and hence registering as a new slave before
// the master removed the old slave from its map.
LOG(INFO)
<< "Removing old disconnected slave " << *slave
<< " because a registration attempt is being made from " << from;
removeSlave(slave);
break;
} else {
LOG(INFO) << "Slave " << *slave << " already registered,"
<< " resending acknowledgement";
SlaveRegisteredMessage message;
message.mutable_slave_id()->MergeFrom(slave->id);
send(from, message);
return;
}
}
}
// We need to generate a SlaveID and admit this slave only *once*.
if (slaves.registering.contains(from)) {
LOG(INFO) << "Ignoring register slave message from " << from
<< " (" << slaveInfo.hostname() << ") as admission is"
<< " already in progress";
return;
}
slaves.registering.insert(from);
// Create and add the slave id.
SlaveInfo slaveInfo_ = slaveInfo;
slaveInfo_.mutable_id()->CopyFrom(newSlaveId());
LOG(INFO) << "Registering slave at " << from << " ("
<< slaveInfo.hostname() << ") with id " << slaveInfo_.id();
registrar->apply(Owned<Operation>(new AdmitSlave(slaveInfo_)))
.onAny(defer(self(),
&Self::_registerSlave,
slaveInfo_,
from,
lambda::_1));
}
void Master::_registerSlave(
const SlaveInfo& slaveInfo,
const UPID& pid,
const Future<bool>& admit)
{
slaves.registering.erase(pid);
CHECK(!admit.isDiscarded());
if (admit.isFailed()) {
LOG(FATAL) << "Failed to admit slave " << slaveInfo.id() << " at " << pid
<< " (" << slaveInfo.hostname() << "): " << admit.failure();
} else if (!admit.get()) {
// This means the slave is already present in the registrar, it's
// likely we generated a duplicate slave id!
LOG(ERROR) << "Slave " << slaveInfo.id() << " at " << pid
<< " (" << slaveInfo.hostname() << ") was not admitted, "
<< "asking to shut down";
slaves.removed.put(slaveInfo.id(), Nothing());
ShutdownMessage message;
message.set_message(
"Slave attempted to register but got duplicate slave id " +
stringify(slaveInfo.id()));
send(pid, message);
} else {
Slave* slave = new Slave(slaveInfo, slaveInfo.id(), pid, Clock::now());
LOG(INFO) << "Registered slave " << *slave;
++metrics.slave_registrations;
addSlave(slave);
}
}
void Master::reregisterSlave(
const UPID& from,
const SlaveID& slaveId,
const SlaveInfo& slaveInfo,
const vector<ExecutorInfo>& executorInfos,
const vector<Task>& tasks,
const vector<Archive::Framework>& completedFrameworks)
{
++metrics.messages_reregister_slave;
if (authenticating.contains(from)) {
LOG(INFO) << "Queuing up re-registration request from " << from
<< " because authentication is still in progress";
authenticating[from]
.onReady(defer(self(),
&Self::reregisterSlave,
from,
slaveId,
slaveInfo,
executorInfos,
tasks,
completedFrameworks));
return;
}
if (flags.authenticate_slaves && !authenticated.contains(from)) {
// This could happen if another authentication request came
// through before we are here or if a slave tried to
// re-register without authentication.
LOG(WARNING) << "Refusing re-registration of slave at " << from
<< " because it is not authenticated";
ShutdownMessage message;
message.set_message("Slave is not authenticated");
send(from, message);
return;
}
if (slaves.removed.get(slaveInfo.id()).isSome()) {
// To compensate for the case where a non-strict registrar is
// being used, we explicitly deny removed slaves from
// re-registering. This is because a non-strict registrar cannot
// enforce this. We've already told frameworks the tasks were
// lost so it's important to deny the slave from re-registering.
LOG(WARNING) << "Slave " << slaveId << " at " << from
<< " (" << slaveInfo.hostname() << ") attempted to "
<< "re-register after removal; shutting it down";
ShutdownMessage message;
message.set_message("Slave attempted to re-register after removal");
send(from, message);
return;
}
Slave* slave = getSlave(slaveInfo.id());
if (slave != NULL) {
slave->reregisteredTime = Clock::now();
// NOTE: This handles the case where a slave tries to
// re-register with an existing master (e.g. because of a
// spurious Zookeeper session expiration or after the slave
// recovers after a restart).
// For now, we assume this slave is not nefarious (eventually
// this will be handled by orthogonal security measures like key
// based authentication).
LOG(WARNING) << "Slave at " << from << " (" << slave->info.hostname()
<< ") is being allowed to re-register with an already"
<< " in use id (" << slave->id << ")";
// TODO(bmahler): There's an implicit assumption here that when
// the master already knows about this slave, the slave cannot
// have tasks unknown to the master. This _should_ be the case
// since the causal relationship is:
// slave removes task -> master removes task
// We should enforce this via a CHECK (dangerous), or by shutting
// down slaves that are found to violate this assumption.
SlaveReregisteredMessage message;
message.mutable_slave_id()->MergeFrom(slave->id);
send(from, message);
// Update the slave pid and relink to it.
// NOTE: Re-linking the slave here always rather than only when
// the slave is disconnected can lead to multiple exited events
// in succession for a disconnected slave. As a result, we
// ignore duplicate exited events for disconnected checkpointing
// slaves.
// See: https://issues.apache.org/jira/browse/MESOS-675
slave->pid = from;
link(slave->pid);
// Reconcile tasks between master and the slave.
// NOTE: This needs to be done after the registration message is
// sent to the slave and the new pid is linked.
reconcile(slave, executorInfos, tasks);
// If this is a disconnected slave, add it back to the allocator.
// This is done after reconciliation to ensure the allocator's
// offers include the recovered resources initially on this
// slave.
if (slave->disconnected) {
slave->disconnected = false; // Reset the flag.
allocator->slaveActivated(slave->id);
}
// Inform the slave of the new framework pids for its tasks.
__reregisterSlave(slave, tasks);
return;
}
// Ensure we don't remove the slave for not re-registering after
// we've recovered it from the registry.
slaves.recovered.erase(slaveInfo.id());
// If we're already re-registering this slave, then no need to ask
// the registrar again.
if (slaves.reregistering.contains(slaveInfo.id())) {
LOG(INFO)
<< "Ignoring re-register slave message from slave "
<< slaveInfo.id() << " at " << from << " ("
<< slaveInfo.hostname() << ") as readmission is already in progress";
return;
}
LOG(INFO) << "Re-registering slave " << slaveInfo.id() << " at " << from
<< " (" << slaveInfo.hostname() << ")";
slaves.reregistering.insert(slaveInfo.id());
// This handles the case when the slave tries to re-register with
// a failed over master, in which case we must consult the
// registrar.
registrar->apply(Owned<Operation>(new ReadmitSlave(slaveInfo)))
.onAny(defer(self(),
&Self::_reregisterSlave,
slaveInfo,
from,
executorInfos,
tasks,
completedFrameworks,
lambda::_1));
}
void Master::_reregisterSlave(
const SlaveInfo& slaveInfo,
const UPID& pid,
const vector<ExecutorInfo>& executorInfos,
const vector<Task>& tasks,
const vector<Archive::Framework>& completedFrameworks,
const Future<bool>& readmit)
{
slaves.reregistering.erase(slaveInfo.id());
CHECK(!readmit.isDiscarded());
if (readmit.isFailed()) {
LOG(FATAL) << "Failed to readmit slave " << slaveInfo.id() << " at " << pid
<< " (" << slaveInfo.hostname() << "): " << readmit.failure();
} else if (!readmit.get()) {
LOG(WARNING) << "The slave " << slaveInfo.id() << " at "
<< pid << " (" << slaveInfo.hostname() << ") could not be"
<< " readmitted; shutting it down";
slaves.removed.put(slaveInfo.id(), Nothing());
ShutdownMessage message;
message.set_message(
"Slave attempted to re-register with unknown slave id " +
stringify(slaveInfo.id()));
send(pid, message);
} else {
// Re-admission succeeded.
Slave* slave = new Slave(slaveInfo, slaveInfo.id(), pid, Clock::now());
slave->reregisteredTime = Clock::now();
LOG(INFO) << "Re-registered slave " << *slave;
++metrics.slave_reregistrations;
readdSlave(slave, executorInfos, tasks, completedFrameworks);
__reregisterSlave(slave, tasks);
}
}
void Master::__reregisterSlave(Slave* slave, const vector<Task>& tasks)
{
// Send the latest framework pids to the slave.
hashset<UPID> pids;
foreach (const Task& task, tasks) {
Framework* framework = getFramework(task.framework_id());
if (framework != NULL && !pids.contains(framework->pid)) {
UpdateFrameworkMessage message;
message.mutable_framework_id()->MergeFrom(framework->id);
message.set_pid(framework->pid);
send(slave->pid, message);
pids.insert(framework->pid);
}
}
}
void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId)
{
++metrics.messages_unregister_slave;
LOG(INFO) << "Asked to unregister slave " << slaveId;
Slave* slave = getSlave(slaveId);
if (slave != NULL) {
if (slave->pid != from) {
LOG(WARNING) << "Ignoring unregister slave message from " << from
<< " because it is not the slave " << slave->pid;
return;
}
removeSlave(slave);
}
}
// NOTE: We cannot use 'from' here to identify the slave as this is
// now sent by the StatusUpdateManagerProcess and master itself when
// it generates TASK_LOST messages. Only 'pid' can be used to identify
// the slave.
void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
{
++metrics.messages_status_update;
if (slaves.removed.get(update.slave_id()).isSome()) {
// If the slave is removed, we have already informed
// frameworks that its tasks were LOST, so the slave should
// shut down.
LOG(WARNING) << "Ignoring status update " << update
<< " from removed slave " << pid
<< " with id " << update.slave_id() << " ; asking slave "
<< " to shutdown";
ShutdownMessage message;
message.set_message("Status update from unknown slave");
send(pid, message);
stats.invalidStatusUpdates++;
metrics.invalid_status_updates++;
return;
}
Slave* slave = getSlave(update.slave_id());
if (slave == NULL) {
LOG(WARNING) << "Ignoring status update " << update
<< " from unknown slave " << pid
<< " with id " << update.slave_id();
stats.invalidStatusUpdates++;
metrics.invalid_status_updates++;
return;
}
Framework* framework = getFramework(update.framework_id());
if (framework == NULL) {
LOG(WARNING) << "Ignoring status update " << update
<< " from slave " << *slave
<< " because the framework is unknown";
stats.invalidStatusUpdates++;
metrics.invalid_status_updates++;
return;
}
// Forward the update to the framework.
forward(update, pid, framework);
// Lookup the task and see if we need to update anything locally.
const TaskStatus& status = update.status();
Task* task = slave->getTask(update.framework_id(), status.task_id());
if (task == NULL) {
LOG(WARNING) << "Could not lookup task for status update " << update
<< " from slave " << *slave;
stats.invalidStatusUpdates++;
metrics.invalid_status_updates++;
return;
}
LOG(INFO) << "Status update " << update << " from slave " << *slave;
// TODO(brenden) Consider wiping the `data` and `message` fields?
if (task->statuses_size() > 0 &&
task->statuses(task->statuses_size() - 1).state() == status.state()) {
task->mutable_statuses()->RemoveLast();
}
task->add_statuses()->CopyFrom(status);
task->set_state(status.state());
// Handle the task appropriately if it's terminated.
if (protobuf::isTerminalState(status.state())) {
removeTask(task);
}
stats.tasks[status.state()]++;
stats.validStatusUpdates++;
metrics.valid_status_updates++;
}
void Master::forward(
const StatusUpdate& update,
const UPID& acknowledgee,
Framework* framework)
{
CHECK_NOTNULL(framework);
if (!acknowledgee) {
LOG(INFO) << "Sending status update " << update
<< (update.status().has_message()
? " '" + update.status().message() + "'"
: "");
} else {
LOG(INFO) << "Forwarding status update " << update;
}
StatusUpdateMessage message;
message.mutable_update()->MergeFrom(update);
message.set_pid(acknowledgee);
send(framework->pid, message);
}
void Master::exitedExecutor(
const UPID& from,
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
int32_t status)
{
++metrics.messages_exited_executor;
if (slaves.removed.get(slaveId).isSome()) {
// If the slave is removed, we have already informed
// frameworks that its tasks were LOST, so the slave should
// shut down.
LOG(WARNING) << "Ignoring exited executor '" << executorId
<< "' of framework " << frameworkId
<< " on removed slave " << slaveId
<< " ; asking slave to shutdown";
ShutdownMessage message;
message.set_message("Executor exited message from unknown slave");
reply(message);
return;
}
// Only update master's internal data structures here for proper
// accounting. The TASK_LOST updates are handled by the slave.
if (!slaves.registered.contains(slaveId)) {
LOG(WARNING) << "Ignoring exited executor '" << executorId
<< "' of framework " << frameworkId
<< " on unknown slave " << slaveId;
return;
}
Slave* slave = CHECK_NOTNULL(slaves.registered[slaveId]);
// Tell the allocator about the recovered resources.
if (slave->hasExecutor(frameworkId, executorId)) {
ExecutorInfo executor = slave->executors[frameworkId][executorId];
LOG(INFO) << "Executor " << executorId
<< " of framework " << frameworkId
<< " on slave " << *slave << " "
<< WSTRINGIFY(status);
allocator->resourcesRecovered(
frameworkId, slaveId, Resources(executor.resources()), None());
// Remove executor from slave and framework.
slave->removeExecutor(frameworkId, executorId);
} else {
LOG(WARNING) << "Ignoring unknown exited executor "
<< executorId << " on slave " << *slave;
}
Framework* framework = getFramework(frameworkId);
if (framework != NULL) {
framework->removeExecutor(slave->id, executorId);
// TODO(benh): Send the framework its executor's exit status?
// Or maybe at least have something like
// Scheduler::executorLost?
}
}
void Master::shutdownSlave(const SlaveID& slaveId, const string& message)
{
if (!slaves.registered.contains(slaveId)) {
// Possible when the SlaveObserver dispatched to shutdown a slave,
// but exited() was already called for this slave.
LOG(WARNING) << "Unable to shutdown unknown slave " << slaveId;
return;
}
Slave* slave = slaves.registered[slaveId];
CHECK_NOTNULL(slave);
LOG(WARNING) << "Shutting down slave " << *slave << " with message '"
<< message << "'";
ShutdownMessage message_;
message_.set_message(message);
send(slave->pid, message_);
removeSlave(slave);
}
void Master::reconcileTasks(
const UPID& from,
const FrameworkID& frameworkId,
const std::vector<TaskStatus>& statuses)
{
++metrics.messages_reconcile_tasks;
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING) << "Unknown framework " << frameworkId << " at " << from
<< " attempted to reconcile tasks";
return;
}
if (from != framework->pid) {
LOG(WARNING)
<< "Ignoring reconcile tasks message for framework " << frameworkId
<< " from '" << from << "' because it is not from the registered"
<< " framework '" << framework->pid << "'";
return;
}
if (statuses.empty()) {
// Implicit reconciliation.
LOG(INFO) << "Performing implicit task state reconciliation for framework "
<< frameworkId;
foreachvalue (const TaskInfo& task, framework->pendingTasks) {
const StatusUpdate& update = protobuf::createStatusUpdate(
frameworkId,
task.slave_id(),
task.task_id(),
TASK_STAGING,
"Reconciliation: Latest task state");
VLOG(1) << "Sending implicit reconciliation state "
<< update.status().state()
<< " for task " << update.status().task_id()
<< " of framework " << frameworkId;
// TODO(bmahler): Consider using forward(); might lead to too
// much logging.
StatusUpdateMessage message;
message.mutable_update()->CopyFrom(update);
send(framework->pid, message);
}
foreachvalue (Task* task, framework->tasks) {
const StatusUpdate& update = protobuf::createStatusUpdate(
frameworkId,
task->slave_id(),
task->task_id(),
task->state(),
"Reconciliation: Latest task state");
VLOG(1) << "Sending implicit reconciliation state "
<< update.status().state()
<< " for task " << update.status().task_id()
<< " of framework " << frameworkId;
// TODO(bmahler): Consider using forward(); might lead to too
// much logging.
StatusUpdateMessage message;
message.mutable_update()->CopyFrom(update);
send(framework->pid, message);
}
return;
}
// Explicit reconciliation.
LOG(INFO) << "Performing explicit task state reconciliation for "
<< statuses.size() << " tasks of framework " << frameworkId;
// Explicit reconciliation occurs for the following cases:
// (1) Task is known, but pending: TASK_STAGING.
// (2) Task is known: send the latest state.
// (3) Task is unknown, slave is registered: TASK_LOST.
// (4) Task is unknown, slave is transitioning: no-op.
// (5) Task is unknown, slave is unknown: TASK_LOST.
//
// When using a non-strict registry, case (5) may result in
// a TASK_LOST for a task that may later be non-terminal. This
// is better than no reply at all because the framework can take
// action for TASK_LOST. Later, if the task is running, the
// framework can discover it with implicit reconciliation and will
// be able to kill it.
foreach (const TaskStatus& status, statuses) {
Option<SlaveID> slaveId = None();
if (status.has_slave_id()) {
slaveId = status.slave_id();
}
Option<StatusUpdate> update = None();
Task* task = framework->getTask(status.task_id());
if (framework->pendingTasks.contains(status.task_id())) {
// (1) Task is known, but pending: TASK_STAGING.
const TaskInfo& task_ = framework->pendingTasks[status.task_id()];
update = protobuf::createStatusUpdate(
frameworkId,
task_.slave_id(),
task_.task_id(),
TASK_STAGING,
"Reconciliation: Latest task state");
} else if (task != NULL) {
// (2) Task is known: send the latest state.
update = protobuf::createStatusUpdate(
frameworkId,
task->slave_id(),
task->task_id(),
task->state(),
"Reconciliation: Latest task state");
} else if (slaveId.isSome() && slaves.registered.contains(slaveId.get())) {
// (3) Task is unknown, slave is registered: TASK_LOST.
update = protobuf::createStatusUpdate(
frameworkId,
slaveId.get(),
status.task_id(),
TASK_LOST,
"Reconciliation: Task is unknown to the slave");
} else if (slaves.transitioning(slaveId)) {
// (4) Task is unknown, slave is transitionary: no-op.
LOG(INFO) << "Ignoring reconciliation request of task "
<< status.task_id() << " from framework " << frameworkId
<< " because there are transitional slaves";
} else {
// (5) Task is unknown, slave is unknown: TASK_LOST.
update = protobuf::createStatusUpdate(
frameworkId,
slaveId,
status.task_id(),
TASK_LOST,
"Reconciliation: Task is unknown");
}
if (update.isSome()) {
VLOG(1) << "Sending explicit reconciliation state "
<< update.get().status().state()
<< " for task " << update.get().status().task_id()
<< " of framework " << frameworkId;
// TODO(bmahler): Consider using forward(); might lead to too
// much logging.
StatusUpdateMessage message;
message.mutable_update()->CopyFrom(update.get());
send(framework->pid, message);
}
}
}
void Master::frameworkFailoverTimeout(const FrameworkID& frameworkId,
const Time& reregisteredTime)
{
Framework* framework = getFramework(frameworkId);
if (framework != NULL && !framework->active) {
// If the re-registration time has not changed, then the framework
// has not re-registered within the failover timeout.
if (framework->reregisteredTime == reregisteredTime) {
LOG(INFO) << "Framework failover timeout, removing framework "
<< framework->id;
removeFramework(framework);
}
}
}
void Master::offer(const FrameworkID& frameworkId,
const hashmap<SlaveID, Resources>& resources)
{
if (!frameworks.registered.contains(frameworkId) ||
!frameworks.registered[frameworkId]->active) {
LOG(WARNING) << "Master returning resources offered to framework "
<< frameworkId << " because the framework"
<< " has terminated or is inactive";
foreachpair (const SlaveID& slaveId, const Resources& offered, resources) {
allocator->resourcesRecovered(frameworkId, slaveId, offered, None());
}
return;
}
// Create an offer for each slave and add it to the message.
ResourceOffersMessage message;
Framework* framework = frameworks.registered[frameworkId];
foreachpair (const SlaveID& slaveId, const Resources& offered, resources) {
if (!slaves.registered.contains(slaveId)) {
LOG(WARNING) << "Master returning resources offered to framework "
<< frameworkId << " because slave " << slaveId
<< " is not valid";
allocator->resourcesRecovered(frameworkId, slaveId, offered, None());
continue;
}
Slave* slave = slaves.registered[slaveId];
CHECK(slave->info.checkpoint() || !framework->info.checkpoint())
<< "Resources of non checkpointing slave " << *slave
<< " are being offered to checkpointing framework " << frameworkId;
// This could happen if the allocator dispatched 'Master::offer' before
// it received 'Allocator::slaveRemoved' from the master.
if (slave->disconnected) {
LOG(WARNING) << "Master returning resources offered because slave "
<< *slave << " is disconnected";
allocator->resourcesRecovered(frameworkId, slaveId, offered, None());
continue;
}
#ifdef WITH_NETWORK_ISOLATOR
// TODO(dhamon): This flag is required as the static allocation of
// ephemeral ports leads to a maximum number of containers that can
// be created on each slave. Once MESOS-1654 is fixed and ephemeral
// ports are a first class resource, this can be removed.
if (flags.max_executors_per_slave.isSome()) {
// Check that we haven't hit the executor limit.
size_t numExecutors = 0;
foreachkey (const FrameworkID& frameworkId, slave->executors) {
numExecutors += slave->executors[frameworkId].keys().size();
}
if (numExecutors >= flags.max_executors_per_slave.get()) {
LOG(WARNING) << "Master returning resources offered because slave "
<< *slave << " has reached the maximum number of "
<< "executors";
// Pass a default filter to avoid getting this same offer immediately
// from the allocator.
allocator->resourcesRecovered(frameworkId, slaveId, offered, Filters());
continue;
}
}
#endif // WITH_NETWORK_ISOLATOR
Offer* offer = new Offer();
offer->mutable_id()->MergeFrom(newOfferId());
offer->mutable_framework_id()->MergeFrom(framework->id);
offer->mutable_slave_id()->MergeFrom(slave->id);
offer->set_hostname(slave->info.hostname());
offer->mutable_resources()->MergeFrom(offered);
offer->mutable_attributes()->MergeFrom(slave->info.attributes());
// Add all framework's executors running on this slave.
if (slave->executors.contains(framework->id)) {
const hashmap<ExecutorID, ExecutorInfo>& executors =
slave->executors[framework->id];
foreachkey (const ExecutorID& executorId, executors) {
offer->add_executor_ids()->MergeFrom(executorId);
}
}
offers[offer->id()] = offer;
framework->addOffer(offer);
slave->addOffer(offer);
// TODO(jieyu): For now, we strip 'ephemeral_ports' resource from
// offers so that frameworks do not see this resource. This is a
// short term workaround. Revisit this once we resolve MESOS-1654.
Offer offer_ = *offer;
offer_.clear_resources();
foreach (const Resource& resource, offered) {
if (resource.name() != "ephemeral_ports") {
offer_.add_resources()->CopyFrom(resource);
}
}
// Add the offer *AND* the corresponding slave's PID.
message.add_offers()->MergeFrom(offer_);
message.add_pids(slave->pid);
}
if (message.offers().size() == 0) {
return;
}
LOG(INFO) << "Sending " << message.offers().size()
<< " offers to framework " << framework->id;
send(framework->pid, message);
}
// TODO(vinod): If due to network partition there are two instances
// of the framework that think they are leaders and try to
// authenticate with master they would be stepping on each other's
// toes. Currently it is tricky to detect this case because the
// 'authenticate' message doesn't contain the 'FrameworkID'.
void Master::authenticate(const UPID& from, const UPID& pid)
{
++metrics.messages_authenticate;
// Deactivate the framework/slave if it's already registered.
// TODO(adam-mesos): MESOS-1081: Do not deactivate the current
// framework/slave before we find out if the new one is legit.
bool found = false;
foreachvalue (Framework* framework, frameworks.registered) {
if (framework->pid == pid) {
deactivate(framework);
found = true;
break;
}
}
if (!found) {
foreachvalue (Slave* slave, slaves.registered) {
if (slave->pid == pid) {
disconnect(slave);
break;
}
}
}
authenticated.erase(pid);
if (authenticating.contains(pid)) {
LOG(INFO) << "Queuing up authentication request from " << pid
<< " because authentication is still in progress";
// Try to cancel the in progress authentication by deleting
// the authenticator.
authenticators.erase(pid);
// Retry after the current authenticator finishes.
authenticating[pid]
.onAny(defer(self(), &Self::authenticate, from, pid));
return;
}
LOG(INFO) << "Authenticating " << pid;
// Create a promise to capture the entire "authenticating"
// procedure. We'll set this _after_ we finish _authenticate.
Owned<Promise<Nothing> > promise(new Promise<Nothing>());
// Create the authenticator.
Owned<sasl::Authenticator> authenticator(new sasl::Authenticator(from));
// Start authentication.
const Future<Option<string> >& future = authenticator->authenticate()
.onAny(defer(self(), &Self::_authenticate, pid, promise, lambda::_1));
// Don't wait for authentication to happen for ever.
delay(Seconds(5),
self(),
&Self::authenticationTimeout,
future);
// Save our state.
authenticating[pid] = promise->future();
authenticators.put(pid, authenticator);
}
void Master::_authenticate(
const UPID& pid,
const Owned<Promise<Nothing> >& promise,
const Future<Option<string> >& future)
{
if (!future.isReady() || future.get().isNone()) {
const string& error = future.isReady()
? "Refused authentication"
: (future.isFailed() ? future.failure() : "future discarded");
LOG(WARNING) << "Failed to authenticate " << pid
<< ": " << error;
promise->fail(error);
} else {
LOG(INFO) << "Successfully authenticated principal '" << future.get().get()
<< "' at " << pid;
promise->set(Nothing());
authenticated.put(pid, future.get().get());
}
authenticators.erase(pid);
authenticating.erase(pid);
}
void Master::authenticationTimeout(Future<Option<string> > future)
{
// Note that a 'discard' here is safe even if another
// authenticator is in progress because this copy of the future
// corresponds to the original authenticator that started the timer.
if (future.discard()) { // This is a no-op if the future is already ready.
LOG(WARNING) << "Authentication timed out";
}
}
// Return connected frameworks that are not in the process of being removed
vector<Framework*> Master::getActiveFrameworks() const
{
vector <Framework*> result;
foreachvalue (Framework* framework, frameworks.registered) {
if (framework->active) {
result.push_back(framework);
}
}
return result;
}
// NOTE: This function is only called when the slave re-registers
// with a master that already knows about it (i.e., not a failed
// over master).
void Master::reconcile(
Slave* slave,
const vector<ExecutorInfo>& executors,
const vector<Task>& tasks)
{
CHECK_NOTNULL(slave);
// We convert the 'tasks' into a map for easier lookup below.
// TODO(vinod): Check if the tasks are known to the master.
multihashmap<FrameworkID, TaskID> slaveTasks;
foreach (const Task& task, tasks) {
slaveTasks.put(task.framework_id(), task.task_id());
}
// Send TASK_LOST updates for tasks present in the master but
// missing from the slave. This could happen if the task was
// dropped by the slave (e.g., slave exited before getting the
// task or the task was launched while slave was in recovery).
// NOTE: keys() and values() are used since statusUpdate()
// modifies slave->tasks.
foreach (const FrameworkID& frameworkId, slave->tasks.keys()) {
foreach (Task* task, slave->tasks[frameworkId].values()) {
if (!slaveTasks.contains(task->framework_id(), task->task_id())) {
LOG(WARNING) << "Sending TASK_LOST for task " << task->task_id()
<< " of framework " << task->framework_id()
<< " unknown to the slave " << *slave;
const StatusUpdate& update = protobuf::createStatusUpdate(
task->framework_id(),
slave->id,
task->task_id(),
TASK_LOST,
"Task is unknown to the slave");
statusUpdate(update, UPID());
}
}
}
// Likewise, any executors that are present in the master but
// not present in the slave must be removed to correctly account
// for resources. First we index the executors for fast lookup below.
multihashmap<FrameworkID, ExecutorID> slaveExecutors;
foreach (const ExecutorInfo& executor, executors) {
// TODO(bmahler): The slave ensures the framework id is set in the
// framework info when re-registering. This can be killed in 0.15.0
// as we've added code in 0.14.0 to ensure the framework id is set
// in the scheduler driver.
if (!executor.has_framework_id()) {
LOG(ERROR) << "Slave " << *slave
<< " re-registered with executor " << executor.executor_id()
<< " without setting the framework id";
continue;
}
slaveExecutors.put(executor.framework_id(), executor.executor_id());
}
// Now that we have the index for lookup, remove all the executors
// in the master that are not known to the slave.
foreachkey (const FrameworkID& frameworkId, utils::copy(slave->executors)) {
foreachkey (const ExecutorID& executorId,
utils::copy(slave->executors[frameworkId])) {
if (!slaveExecutors.contains(frameworkId, executorId)) {
LOG(WARNING) << "Removing executor " << executorId << " of framework "
<< frameworkId << " as it is unknown to the slave "
<< *slave;
// TODO(bmahler): This is duplicated in several locations, we
// may benefit from a method for removing an executor from
// all the relevant data structures and the allocator, akin
// to removeTask().
allocator->resourcesRecovered(
frameworkId,
slave->id,
slave->executors[frameworkId][executorId].resources(),
None());
slave->removeExecutor(frameworkId, executorId);
if (frameworks.registered.contains(frameworkId)) {
frameworks.registered[frameworkId]->removeExecutor(
slave->id, executorId);
}
}
}
}
// Send KillTaskMessages for tasks in 'killedTasks' that are
// still alive on the slave. This could happen if the slave
// did not receive KillTaskMessage because of a partition or
// disconnection.
foreach (const Task& task, tasks) {
if (!protobuf::isTerminalState(task.state()) &&
slave->killedTasks.contains(task.framework_id(), task.task_id())) {
LOG(WARNING) << " Slave " << *slave
<< " has non-terminal task " << task.task_id()
<< " that is supposed to be killed. Killing it now!";
KillTaskMessage message;
message.mutable_framework_id()->MergeFrom(task.framework_id());
message.mutable_task_id()->MergeFrom(task.task_id());
send(slave->pid, message);
}
}
// Send ShutdownFrameworkMessages for frameworks that are completed.
// This could happen if the message wasn't received by the slave
// (e.g., slave was down, partitioned).
// NOTE: This is a short-term hack because this information is lost
// when the master fails over. Also, we only store a limited number
// of completed frameworks.
// TODO(vinod): Revisit this when registrar is in place. It would
// likely involve storing this information in the registrar.
foreach (const shared_ptr<Framework>& framework, frameworks.completed) {
if (slaveTasks.contains(framework->id)) {
LOG(WARNING)
<< "Slave " << *slave
<< " re-registered with completed framework " << framework->id
<< ". Shutting down the framework on the slave";
ShutdownFrameworkMessage message;
message.mutable_framework_id()->MergeFrom(framework->id);
send(slave->pid, message);
}
}
}
void Master::addFramework(Framework* framework)
{
CHECK(!frameworks.registered.contains(framework->id))
<< "Framework " << framework->id << "already exists!";
frameworks.registered[framework->id] = framework;
link(framework->pid);
// Enforced by Master::registerFramework.
CHECK(roles.contains(framework->info.role()))
<< "Unknown role " << framework->info.role()
<< " of framework " << framework->id;
roles[framework->info.role()]->addFramework(framework);
FrameworkRegisteredMessage message;
message.mutable_framework_id()->MergeFrom(framework->id);
message.mutable_master_info()->MergeFrom(info_);
send(framework->pid, message);
allocator->frameworkAdded(
framework->id, framework->info, framework->resources);
// Export framework metrics.
// If the framework is authenticated, its principal should be in
// 'authenticated'. Otherwise look if it's supplied in
// FrameworkInfo.
Option<string> principal = authenticated.get(framework->pid);
if (principal.isNone() && framework->info.has_principal()) {
principal = framework->info.principal();
}
CHECK(!frameworks.principals.contains(framework->pid));
frameworks.principals.put(framework->pid, principal);
// Export framework metrics if a principal is specified.
if (principal.isSome()) {
// Create new framework metrics if this framework is the first
// one of this principal. Otherwise existing metrics are reused.
if (!metrics.frameworks.contains(principal.get())) {
metrics.frameworks.put(
principal.get(),
Owned<Metrics::Frameworks>(new Metrics::Frameworks(principal.get())));
}
}
}
// Replace the scheduler for a framework with a new process ID, in the
// event of a scheduler failover.
void Master::failoverFramework(Framework* framework, const UPID& newPid)
{
const UPID oldPid = framework->pid;
// There are a few failover cases to consider:
// 1. The pid has changed. In this case we definitely want to
// send a FrameworkErrorMessage to shut down the older
// scheduler.
// 2. The pid has not changed.
// 2.1 The old scheduler on that pid failed over to a new
// instance on the same pid. No need to shut down the old
// scheduler as it is necessarily dead.
// 2.2 This is a duplicate message. In this case, the scheduler
// has not failed over, so we do not want to shut it down.
if (oldPid != newPid) {
FrameworkErrorMessage message;
message.set_message("Framework failed over");
send(oldPid, message);
}
// TODO(benh): unlink(oldPid);
framework->pid = newPid;
link(newPid);
// Make sure we can get offers again.
// TODO(vinod): Do this after we recover resources below.
if (!framework->active) {
framework->active = true;
allocator->frameworkActivated(framework->id, framework->info);
}
// The scheduler driver safely ignores any duplicate registration
// messages, so we don't need to compare the old and new pids here.
{
FrameworkRegisteredMessage message;
message.mutable_framework_id()->MergeFrom(framework->id);
message.mutable_master_info()->MergeFrom(info_);
send(newPid, message);
}
// Remove the framework's offers (if they weren't removed before).
// We do this after we have updated the pid and sent the framework
// registered message so that the allocator can immediately re-offer
// these resources to this framework if it wants.
foreach (Offer* offer, utils::copy(framework->offers)) {
allocator->resourcesRecovered(
offer->framework_id(), offer->slave_id(), offer->resources(), None());
removeOffer(offer);
}
// 'Failover' the framework's metrics. i.e., change the lookup key
// for its metrics to 'newPid'.
if (oldPid != newPid && frameworks.principals.contains(oldPid)) {
frameworks.principals[newPid] = frameworks.principals[oldPid];
frameworks.principals.erase(oldPid);
}
}
void Master::removeFramework(Framework* framework)
{
CHECK_NOTNULL(framework);
LOG(INFO) << "Removing framework " << framework->id;
if (framework->active) {
// Tell the allocator to stop allocating resources to this framework.
// TODO(vinod): Consider setting framework->active to false here
// or just calling 'deactivate(Framework*)'.
allocator->frameworkDeactivated(framework->id);
}
// Tell slaves to shutdown the framework.
foreachvalue (Slave* slave, slaves.registered) {
ShutdownFrameworkMessage message;
message.mutable_framework_id()->MergeFrom(framework->id);
send(slave->pid, message);
}
// Remove the pending tasks from the framework.
framework->pendingTasks.clear();
// Remove pointers to the framework's tasks in slaves.
foreachvalue (Task* task, utils::copy(framework->tasks)) {
Slave* slave = getSlave(task->slave_id());
// Since we only find out about tasks when the slave re-registers,
// it must be the case that the slave exists!
CHECK(slave != NULL)
<< "Unknown slave " << task->slave_id()
<< " for task " << task->task_id();
removeTask(task);
}
// Remove the framework's offers (if they weren't removed before).
foreach (Offer* offer, utils::copy(framework->offers)) {
allocator->resourcesRecovered(offer->framework_id(),
offer->slave_id(),
Resources(offer->resources()),
None());
removeOffer(offer);
}
// Remove the framework's executors for correct resource accounting.
foreachkey (const SlaveID& slaveId, framework->executors) {
Slave* slave = getSlave(slaveId);
if (slave != NULL) {
foreachpair (const ExecutorID& executorId,
const ExecutorInfo& executorInfo,
framework->executors[slaveId]) {
allocator->resourcesRecovered(
framework->id,
slave->id,
executorInfo.resources(),
None());
slave->removeExecutor(framework->id, executorId);
}
}
}
// TODO(benh): Similar code between removeFramework and
// failoverFramework needs to be shared!
// TODO(benh): unlink(framework->pid);
framework->unregisteredTime = Clock::now();
// The completedFramework buffer now owns the framework pointer.
frameworks.completed.push_back(shared_ptr<Framework>(framework));
CHECK(roles.contains(framework->info.role()))
<< "Unknown role " << framework->info.role()
<< " of framework " << framework->id;
roles[framework->info.role()]->removeFramework(framework);
// Remove the framework from authenticated.
authenticated.erase(framework->pid);
CHECK(frameworks.principals.contains(framework->pid));
const Option<string> principal = frameworks.principals[framework->pid];
frameworks.principals.erase(framework->pid);
// Remove the framework's message counters.
if (principal.isSome()) {
// Remove the metrics for the principal if this framework is the
// last one with this principal.
if (!frameworks.principals.containsValue(principal.get())) {
CHECK(metrics.frameworks.contains(principal.get()));
metrics.frameworks.erase(principal.get());
}
}
// Remove the framework.
frameworks.registered.erase(framework->id);
allocator->frameworkRemoved(framework->id);
}
void Master::removeFramework(Slave* slave, Framework* framework)
{
CHECK_NOTNULL(slave);
CHECK_NOTNULL(framework);
LOG(INFO) << "Removing framework " << framework->id
<< " from slave " << *slave;
// Remove pointers to framework's tasks in slaves, and send status
// updates.
// NOTE: values() is used because statusUpdate() modifies
// slave->tasks.
foreach (Task* task, slave->tasks[framework->id].values()) {
// Remove tasks that belong to this framework.
if (task->framework_id() == framework->id) {
// A framework might not actually exist because the master failed
// over and the framework hasn't reconnected yet. For more info
// please see the comments in 'removeFramework(Framework*)'.
const StatusUpdate& update = protobuf::createStatusUpdate(
task->framework_id(),
task->slave_id(),
task->task_id(),
TASK_LOST,
"Slave " + slave->info.hostname() + " disconnected",
(task->has_executor_id()
? Option<ExecutorID>(task->executor_id()) : None()));
statusUpdate(update, UPID());
}
}
// Remove the framework's executors from the slave and framework
// for proper resource accounting.
if (slave->executors.contains(framework->id)) {
foreachkey (const ExecutorID& executorId,
utils::copy(slave->executors[framework->id])) {
allocator->resourcesRecovered(
framework->id,
slave->id,
slave->executors[framework->id][executorId].resources(),
None());
framework->removeExecutor(slave->id, executorId);
slave->removeExecutor(framework->id, executorId);
}
}
}
void Master::addSlave(Slave* slave, bool reregister)
{
CHECK_NOTNULL(slave);
LOG(INFO) << "Adding slave " << *slave
<< " with " << slave->info.resources();
slaves.removed.erase(slave->id);
slaves.registered[slave->id] = slave;
link(slave->pid);
if (!reregister) {
SlaveRegisteredMessage message;
message.mutable_slave_id()->MergeFrom(slave->id);
send(slave->pid, message);
} else {
SlaveReregisteredMessage message;
message.mutable_slave_id()->MergeFrom(slave->id);
send(slave->pid, message);
}
// Set up an observer for the slave.
slave->observer = new SlaveObserver(
slave->pid, slave->info, slave->id, self());
spawn(slave->observer);
if (!reregister) {
allocator->slaveAdded(
slave->id, slave->info, hashmap<FrameworkID, Resources>());
}
}
void Master::readdSlave(
Slave* slave,
const vector<ExecutorInfo>& executorInfos,
const vector<Task>& tasks,
const vector<Archive::Framework>& completedFrameworks)
{
CHECK_NOTNULL(slave);
addSlave(slave, true);
// Add the executors and tasks to the slave and framework state and
// determine the resources that have been allocated to frameworks.
hashmap<FrameworkID, Resources> resources;
foreach (const ExecutorInfo& executorInfo, executorInfos) {
// TODO(bmahler): ExecutorInfo.framework_id is set by the Scheduler
// Driver in 0.14.0. Therefore, in 0.15.0, the slave no longer needs
// to set it, and we could remove this CHECK if desired.
CHECK(executorInfo.has_framework_id())
<< "Executor " << executorInfo.executor_id()
<< " doesn't have frameworkId set";
if (!slave->hasExecutor(executorInfo.framework_id(),
executorInfo.executor_id())) {
slave->addExecutor(executorInfo.framework_id(), executorInfo);
}
Framework* framework = getFramework(executorInfo.framework_id());
if (framework != NULL) {
if (!framework->hasExecutor(slave->id, executorInfo.executor_id())) {
framework->addExecutor(slave->id, executorInfo);
}
}
resources[executorInfo.framework_id()] += executorInfo.resources();
}
foreach (const Task& task, tasks) {
// Ignore tasks that have reached terminal state.
if (protobuf::isTerminalState(task.state())) {
continue;
}
Task* t = new Task(task);
// Add the task to the slave.
slave->addTask(t);
// Try and add the task to the framework too, but since the
// framework might not yet be connected we won't be able to
// add them. However, when the framework connects later we
// will add them then. Again, we do the same thing
// if a framework currently isn't registered.
Framework* framework = getFramework(task.framework_id());
if (framework != NULL) {
framework->addTask(t);
} else {
// TODO(benh): We should really put a timeout on how long we
// keep tasks running on a slave that never have frameworks
// reregister and claim them.
LOG(WARNING) << "Possibly orphaned task " << task.task_id()
<< " of framework " << task.framework_id()
<< " running on slave " << *slave;
}
resources[task.framework_id()] += task.resources();
}
// Re-add completed tasks reported by the slave.
// Note that a slave considers a framework completed when it has no
// tasks/executors running for that framework. But a master considers a
// framework completed when the framework is removed after a failover timeout.
// TODO(vinod): Reconcile the notion of a completed framework across the
// master and slave.
foreach (const Archive::Framework& completedFramework, completedFrameworks) {
const FrameworkID& frameworkId = completedFramework.framework_info().id();
Framework* framework = getFramework(frameworkId);
foreach (const Task& task, completedFramework.tasks()) {
if (framework != NULL) {
VLOG(2) << "Re-adding completed task " << task.task_id()
<< " of framework " << task.framework_id()
<< " that ran on slave " << *slave;
framework->addCompletedTask(task);
} else {
// We could be here if the framework hasn't registered yet.
// TODO(vinod): Revisit these semantics when we store frameworks'
// information in the registrar.
LOG(WARNING) << "Possibly orphaned completed task " << task.task_id()
<< " of framework " << task.framework_id()
<< " that ran on slave " << *slave;
}
}
}
allocator->slaveAdded(slave->id, slave->info, resources);
}
void Master::removeSlave(Slave* slave)
{
CHECK_NOTNULL(slave);
LOG(INFO) << "Removing slave " << *slave;
// We do this first, to make sure any of the resources recovered
// below (e.g., removeTask()) are ignored by the allocator.
allocator->slaveRemoved(slave->id);
// Transition the tasks to lost and remove them, BUT do not send
// updates. Rather, build up the updates so that we can send them
// after the slave is removed from the registry.
vector<StatusUpdate> updates;
foreach (const FrameworkID& frameworkId, slave->tasks.keys()) {
foreach (Task* task, slave->tasks[frameworkId].values()) {
const StatusUpdate& update = protobuf::createStatusUpdate(
task->framework_id(),
task->slave_id(),
task->task_id(),
TASK_LOST,
"Slave " + slave->info.hostname() + " removed",
(task->has_executor_id() ?
Option<ExecutorID>(task->executor_id()) : None()));
task->add_statuses()->CopyFrom(update.status());
task->set_state(update.status().state());
removeTask(task);
updates.push_back(update);
}
}
foreach (Offer* offer, utils::copy(slave->offers)) {
// TODO(vinod): We don't need to call 'Allocator::resourcesRecovered'
// once MESOS-621 is fixed.
allocator->resourcesRecovered(
offer->framework_id(), slave->id, offer->resources(), None());
// Remove and rescind offers.
removeOffer(offer, true); // Rescind!
}
// Remove executors from the slave for proper resource accounting.
foreachkey (const FrameworkID& frameworkId, slave->executors) {
Framework* framework = getFramework(frameworkId);
if (framework != NULL) {
foreachkey (const ExecutorID& executorId, slave->executors[frameworkId]) {
// TODO(vinod): We don't need to call 'Allocator::resourcesRecovered'
// once MESOS-621 is fixed.
allocator->resourcesRecovered(
frameworkId,
slave->id,
slave->executors[frameworkId][executorId].resources(),
None());
framework->removeExecutor(slave->id, executorId);
}
}
}
// Mark the slave as being removed.
slaves.removing.insert(slave->id);
slaves.registered.erase(slave->id);
slaves.removed.put(slave->id, Nothing());
authenticated.erase(slave->pid);
// Kill the slave observer.
terminate(slave->observer);
wait(slave->observer);
delete slave->observer;
// TODO(benh): unlink(slave->pid);
// Remove this slave from the registrar. Once this is completed, we
// can forward the LOST task updates to the frameworks and notify
// all frameworks that this slave was lost.
registrar->apply(Owned<Operation>(new RemoveSlave(slave->info)))
.onAny(defer(self(),
&Self::_removeSlave,
slave->info,
updates,
lambda::_1));
delete slave;
}
void Master::_removeSlave(
const SlaveInfo& slaveInfo,
const vector<StatusUpdate>& updates,
const Future<bool>& removed)
{
slaves.removing.erase(slaveInfo.id());
CHECK(!removed.isDiscarded());
if (removed.isFailed()) {
LOG(FATAL) << "Failed to remove slave " << slaveInfo.id()
<< " (" << slaveInfo.hostname() << ")"
<< " from the registrar: " << removed.failure();
}
CHECK(removed.get())
<< "Slave " << slaveInfo.id() << " (" << slaveInfo.hostname() << ") "
<< "already removed from the registrar";
LOG(INFO) << "Removed slave " << slaveInfo.id() << " ("
<< slaveInfo.hostname() << ")";
++metrics.slave_removals;
// Forward the LOST updates on to the framework.
foreach (const StatusUpdate& update, updates) {
Framework* framework = getFramework(update.framework_id());
if (framework == NULL) {
LOG(WARNING) << "Dropping update " << update << " because the framework "
<< "is unknown";
} else {
forward(update, UPID(), framework);
}
}
// Notify all frameworks of the lost slave.
foreachvalue (Framework* framework, frameworks.registered) {
LOG(INFO) << "Notifying framework " << framework->id << " of lost slave "
<< slaveInfo.id() << " (" << slaveInfo.hostname() << ") "
<< "after recovering";
LostSlaveMessage message;
message.mutable_slave_id()->MergeFrom(slaveInfo.id());
send(framework->pid, message);
}
}
void Master::removeTask(Task* task)
{
CHECK_NOTNULL(task);
// Remove from framework.
Framework* framework = getFramework(task->framework_id());
if (framework != NULL) { // A framework might not be re-connected yet.
framework->removeTask(task);
}
// Remove from slave.
Slave* slave = getSlave(task->slave_id());
CHECK_NOTNULL(slave);
slave->removeTask(task);
// Tell the allocator about the recovered resources.
allocator->resourcesRecovered(
task->framework_id(),
task->slave_id(),
Resources(task->resources()),
None());
// Update the task state metric.
switch (task->state()) {
case TASK_FINISHED: ++metrics.tasks_finished; break;
case TASK_FAILED: ++metrics.tasks_failed; break;
case TASK_KILLED: ++metrics.tasks_killed; break;
case TASK_LOST: ++metrics.tasks_lost; break;
default:
LOG(WARNING) << "Removing task " << task->task_id()
<< " of framework " << task->framework_id()
<< " and slave " << task->slave_id()
<< " in non-terminal state " << task->state();
break;
}
delete task;
}
// TODO(vinod): Instead of 'removeOffer()', consider implementing
// 'useOffer()', 'discardOffer()' and 'rescindOffer()' for clarity.
void Master::removeOffer(Offer* offer, bool rescind)
{
// Remove from framework.
Framework* framework = getFramework(offer->framework_id());
CHECK(framework != NULL)
<< "Unknown framework " << offer->framework_id()
<< " in the offer " << offer->id();
framework->removeOffer(offer);
// Remove from slave.
Slave* slave = getSlave(offer->slave_id());
CHECK(slave != NULL)
<< "Unknown slave " << offer->slave_id()
<< " in the offer " << offer->id();
slave->removeOffer(offer);
if (rescind) {
RescindResourceOfferMessage message;
message.mutable_offer_id()->MergeFrom(offer->id());
send(framework->pid, message);
}
// Delete it.
offers.erase(offer->id());
delete offer;
}
// TODO(bmahler): Consider killing this.
Framework* Master::getFramework(const FrameworkID& frameworkId)
{
return frameworks.registered.contains(frameworkId)
? frameworks.registered[frameworkId]
: NULL;
}
// TODO(bmahler): Consider killing this.
Slave* Master::getSlave(const SlaveID& slaveId)
{
return slaves.registered.contains(slaveId)
? slaves.registered[slaveId]
: NULL;
}
// TODO(bmahler): Consider killing this.
Offer* Master::getOffer(const OfferID& offerId)
{
return offers.contains(offerId) ? offers[offerId] : NULL;
}
// Create a new framework ID. We format the ID as MASTERID-FWID, where
// MASTERID is the ID of the master (launch date plus fault tolerant ID)
// and FWID is an increasing integer.
FrameworkID Master::newFrameworkId()
{
std::ostringstream out;
out << info_.id() << "-" << std::setw(4)
<< std::setfill('0') << nextFrameworkId++;
FrameworkID frameworkId;
frameworkId.set_value(out.str());
return frameworkId;
}
OfferID Master::newOfferId()
{
OfferID offerId;
offerId.set_value(info_.id() + "-" + stringify(nextOfferId++));
return offerId;
}
SlaveID Master::newSlaveId()
{
SlaveID slaveId;
slaveId.set_value(info_.id() + "-" + stringify(nextSlaveId++));
return slaveId;
}
double Master::_slaves_active()
{
double count = 0.0;
foreachvalue (Slave* slave, slaves.registered) {
if (!slave->disconnected) {
count++;
}
}
return count;
}
// TODO(alexandra.sava): Count also the slaves that have been
// deactivated via HTTP POSTS, once MESOS-1476 will be committed.
double Master::_slaves_inactive()
{
double count = 0.0;
foreachvalue (Slave* slave, slaves.registered) {
if (slave->disconnected) {
count++;
}
}
return count;
}
double Master::_tasks_staging()
{
double count = 0.0;
// Add the tasks pending validation / authorization.
foreachvalue (Framework* framework, frameworks.registered) {
count += framework->pendingTasks.size();
}
foreachvalue (Slave* slave, slaves.registered) {
typedef hashmap<TaskID, Task*> TaskMap;
foreachvalue (const TaskMap& tasks, slave->tasks) {
foreachvalue (const Task* task, tasks) {
if (task->state() == TASK_STAGING) {
count++;
}
}
}
}
return count;
}
double Master::_tasks_starting()
{
double count = 0.0;
foreachvalue (Slave* slave, slaves.registered) {
typedef hashmap<TaskID, Task*> TaskMap;
foreachvalue (const TaskMap& tasks, slave->tasks) {
foreachvalue (const Task* task, tasks) {
if (task->state() == TASK_STARTING) {
count++;
}
}
}
}
return count;
}
double Master::_tasks_running()
{
double count = 0.0;
foreachvalue (Slave* slave, slaves.registered) {
typedef hashmap<TaskID, Task*> TaskMap;
foreachvalue (const TaskMap& tasks, slave->tasks) {
foreachvalue (const Task* task, tasks) {
if (task->state() == TASK_RUNNING) {
count++;
}
}
}
}
return count;
}
// TODO(dhamon): Consider moving to master/metrics.cpp|hpp.
// Message counters are named with "messages_" prefix so they can
// be grouped together alphabetically in the output.
// TODO(alexandra.sava): Add metrics for registered and removed slaves.
Master::Metrics::Metrics(const Master& master)
: uptime_secs(
"master/uptime_secs",
defer(master, &Master::_uptime_secs)),
elected(
"master/elected",
defer(master, &Master::_elected)),
slaves_active(
"master/slaves_active",
defer(master, &Master::_slaves_active)),
slaves_inactive(
"master/slaves_inactive",
defer(master, &Master::_slaves_inactive)),
frameworks_active(
"master/frameworks_active",
defer(master, &Master::_frameworks_active)),
frameworks_inactive(
"master/frameworks_inactive",
defer(master, &Master::_frameworks_inactive)),
outstanding_offers(
"master/outstanding_offers",
defer(master, &Master::_outstanding_offers)),
tasks_staging(
"master/tasks_staging",
defer(master, &Master::_tasks_staging)),
tasks_starting(
"master/tasks_starting",
defer(master, &Master::_tasks_starting)),
tasks_running(
"master/tasks_running",
defer(master, &Master::_tasks_running)),
tasks_finished(
"master/tasks_finished"),
tasks_failed(
"master/tasks_failed"),
tasks_killed(
"master/tasks_killed"),
tasks_lost(
"master/tasks_lost"),
dropped_messages(
"master/dropped_messages"),
messages_register_framework(
"master/messages_register_framework"),
messages_reregister_framework(
"master/messages_reregister_framework"),
messages_unregister_framework(
"master/messages_unregister_framework"),
messages_deactivate_framework(
"master/messages_deactivate_framework"),
messages_kill_task(
"master/messages_kill_task"),
messages_status_update_acknowledgement(
"master/messages_status_update_acknowledgement"),
messages_resource_request(
"master/messages_resource_request"),
messages_launch_tasks(
"master/messages_launch_tasks"),
messages_revive_offers(
"master/messages_revive_offers"),
messages_reconcile_tasks(
"master/messages_reconcile_tasks"),
messages_framework_to_executor(
"master/messages_framework_to_executor"),
messages_register_slave(
"master/messages_register_slave"),
messages_reregister_slave(
"master/messages_reregister_slave"),
messages_unregister_slave(
"master/messages_unregister_slave"),
messages_status_update(
"master/messages_status_update"),
messages_exited_executor(
"master/messages_exited_executor"),
messages_authenticate(
"master/messages_authenticate"),
valid_framework_to_executor_messages(
"master/valid_framework_to_executor_messages"),
invalid_framework_to_executor_messages(
"master/invalid_framework_to_executor_messages"),
valid_status_updates(
"master/valid_status_updates"),
invalid_status_updates(
"master/invalid_status_updates"),
valid_status_update_acknowledgements(
"master/valid_status_update_acknowledgements"),
invalid_status_update_acknowledgements(
"master/invalid_status_update_acknowledgements"),
recovery_slave_removals(
"master/recovery_slave_removals"),
event_queue_messages(
"master/event_queue_messages",
defer(master, &Master::_event_queue_messages)),
event_queue_dispatches(
"master/event_queue_dispatches",
defer(master, &Master::_event_queue_dispatches)),
event_queue_http_requests(
"master/event_queue_http_requests",
defer(master, &Master::_event_queue_http_requests)),
slave_registrations(
"master/slave_registrations"),
slave_reregistrations(
"master/slave_reregistrations"),
slave_removals(
"master/slave_removals")
{
// TODO(dhamon): Check return values of 'add'.
process::metrics::add(uptime_secs);
process::metrics::add(elected);
process::metrics::add(slaves_active);
process::metrics::add(slaves_inactive);
process::metrics::add(frameworks_active);
process::metrics::add(frameworks_inactive);
process::metrics::add(outstanding_offers);
process::metrics::add(tasks_staging);
process::metrics::add(tasks_starting);
process::metrics::add(tasks_running);
process::metrics::add(tasks_finished);
process::metrics::add(tasks_failed);
process::metrics::add(tasks_killed);
process::metrics::add(tasks_lost);
process::metrics::add(dropped_messages);
// Messages from schedulers.
process::metrics::add(messages_register_framework);
process::metrics::add(messages_reregister_framework);
process::metrics::add(messages_unregister_framework);
process::metrics::add(messages_deactivate_framework);
process::metrics::add(messages_kill_task);
process::metrics::add(messages_status_update_acknowledgement);
process::metrics::add(messages_resource_request);
process::metrics::add(messages_launch_tasks);
process::metrics::add(messages_revive_offers);
process::metrics::add(messages_reconcile_tasks);
process::metrics::add(messages_framework_to_executor);
// Messages from slaves.
process::metrics::add(messages_register_slave);
process::metrics::add(messages_reregister_slave);
process::metrics::add(messages_unregister_slave);
process::metrics::add(messages_status_update);
process::metrics::add(messages_exited_executor);
// Messages from both schedulers and slaves.
process::metrics::add(messages_authenticate);
process::metrics::add(valid_framework_to_executor_messages);
process::metrics::add(invalid_framework_to_executor_messages);
process::metrics::add(valid_status_updates);
process::metrics::add(invalid_status_updates);
process::metrics::add(valid_status_update_acknowledgements);
process::metrics::add(invalid_status_update_acknowledgements);
process::metrics::add(recovery_slave_removals);
process::metrics::add(event_queue_messages);
process::metrics::add(event_queue_dispatches);
process::metrics::add(event_queue_http_requests);
process::metrics::add(slave_registrations);
process::metrics::add(slave_reregistrations);
process::metrics::add(slave_removals);
// Create resource gauges.
// TODO(dhamon): Set these up dynamically when adding a slave based on the
// resources the slave exposes.
const string resources[] = {"cpus", "mem", "disk"};
foreach (const string& resource, resources) {
process::metrics::Gauge totalGauge(
"master/" + resource + "_total",
defer(master, &Master::_resources_total, resource));
resources_total.push_back(totalGauge);
process::metrics::add(totalGauge);
process::metrics::Gauge usedGauge(
"master/" + resource + "_used",
defer(master, &Master::_resources_used, resource));
resources_used.push_back(usedGauge);
process::metrics::add(usedGauge);
process::metrics::Gauge percentGauge(
"master/" + resource + "_percent",
defer(master, &Master::_resources_percent, resource));
resources_percent.push_back(percentGauge);
process::metrics::add(percentGauge);
}
}
Master::Metrics::~Metrics()
{
// TODO(dhamon): Check return values of 'remove'.
process::metrics::remove(uptime_secs);
process::metrics::remove(elected);
process::metrics::remove(slaves_active);
process::metrics::remove(slaves_inactive);
process::metrics::remove(frameworks_active);
process::metrics::remove(frameworks_inactive);
process::metrics::remove(outstanding_offers);
process::metrics::remove(tasks_staging);
process::metrics::remove(tasks_starting);
process::metrics::remove(tasks_running);
process::metrics::remove(tasks_finished);
process::metrics::remove(tasks_failed);
process::metrics::remove(tasks_killed);
process::metrics::remove(tasks_lost);
process::metrics::remove(dropped_messages);
// Messages from schedulers.
process::metrics::remove(messages_register_framework);
process::metrics::remove(messages_reregister_framework);
process::metrics::remove(messages_unregister_framework);
process::metrics::remove(messages_deactivate_framework);
process::metrics::remove(messages_kill_task);
process::metrics::remove(messages_status_update_acknowledgement);
process::metrics::remove(messages_resource_request);
process::metrics::remove(messages_launch_tasks);
process::metrics::remove(messages_revive_offers);
process::metrics::remove(messages_reconcile_tasks);
process::metrics::remove(messages_framework_to_executor);
// Messages from slaves.
process::metrics::remove(messages_register_slave);
process::metrics::remove(messages_reregister_slave);
process::metrics::remove(messages_unregister_slave);
process::metrics::remove(messages_status_update);
process::metrics::remove(messages_exited_executor);
// Messages from both schedulers and slaves.
process::metrics::remove(messages_authenticate);
process::metrics::remove(valid_framework_to_executor_messages);
process::metrics::remove(invalid_framework_to_executor_messages);
process::metrics::remove(valid_status_updates);
process::metrics::remove(invalid_status_updates);
process::metrics::remove(valid_status_update_acknowledgements);
process::metrics::remove(invalid_status_update_acknowledgements);
process::metrics::remove(recovery_slave_removals);
process::metrics::remove(event_queue_messages);
process::metrics::remove(event_queue_dispatches);
process::metrics::remove(event_queue_http_requests);
process::metrics::remove(slave_registrations);
process::metrics::remove(slave_reregistrations);
process::metrics::remove(slave_removals);
foreach (const process::metrics::Gauge& gauge, resources_total) {
process::metrics::remove(gauge);
}
resources_total.clear();
foreach (const process::metrics::Gauge& gauge, resources_used) {
process::metrics::remove(gauge);
}
resources_used.clear();
foreach (const process::metrics::Gauge& gauge, resources_percent) {
process::metrics::remove(gauge);
}
resources_percent.clear();
}
double Master::_resources_total(const std::string& name)
{
double total = 0.0;
foreachvalue (Slave* slave, slaves.registered) {
foreach (const Resource& resource, slave->info.resources()) {
if (resource.name() == name && resource.type() == Value::SCALAR) {
total += resource.scalar().value();
}
}
}
return total;
}
double Master::_resources_used(const std::string& name)
{
double used = 0.0;
foreachvalue (Slave* slave, slaves.registered) {
foreach (const Resource& resource, slave->resourcesInUse) {
if (resource.name() == name && resource.type() == Value::SCALAR) {
used += resource.scalar().value();
}
}
}
return used;
}
double Master::_resources_percent(const std::string& name)
{
double total = _resources_total(name);
if (total == 0.0) {
return total;
} else {
return _resources_used(name) / total;
}
}
} // namespace master {
} // namespace internal {
} // namespace mesos {