blob: 838c80167956ec10191536ddd850303999045521 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <stdint.h>
#include <algorithm>
#include <cctype>
#include <fstream>
#include <functional>
#include <iomanip>
#include <list>
#include <memory>
#include <set>
#include <sstream>
#include <tuple>
#include <utility>
#include <mesos/module.hpp>
#include <mesos/roles.hpp>
#include <mesos/authentication/authenticator.hpp>
#include <mesos/authorizer/authorizer.hpp>
#include <mesos/allocator/allocator.hpp>
#include <mesos/master/contender.hpp>
#include <mesos/master/detector.hpp>
#include <mesos/module/authenticator.hpp>
#include <mesos/scheduler/scheduler.hpp>
#include <process/check.hpp>
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/http.hpp>
#include <process/id.hpp>
#include <process/limiter.hpp>
#include <process/owned.hpp>
#include <process/run.hpp>
#include <process/shared.hpp>
#include <process/metrics/metrics.hpp>
#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/error.hpp>
#include <stout/foreach.hpp>
#include <stout/ip.hpp>
#include <stout/lambda.hpp>
#include <stout/multihashmap.hpp>
#include <stout/net.hpp>
#include <stout/nothing.hpp>
#include <stout/numify.hpp>
#include <stout/option.hpp>
#include <stout/path.hpp>
#include <stout/stringify.hpp>
#include <stout/unreachable.hpp>
#include <stout/utils.hpp>
#include <stout/uuid.hpp>
#include "authentication/cram_md5/authenticator.hpp"
#include "common/build.hpp"
#include "common/http.hpp"
#include "common/protobuf_utils.hpp"
#include "common/resource_quantities.hpp"
#include "common/status_utils.hpp"
#include "credentials/credentials.hpp"
#include "hook/manager.hpp"
#include "logging/flags.hpp"
#include "logging/logging.hpp"
#include "master/flags.hpp"
#include "master/master.hpp"
#include "master/registry_operations.hpp"
#include "master/weights.hpp"
#include "module/manager.hpp"
#include "watcher/whitelist_watcher.hpp"
using google::protobuf::RepeatedPtrField;
using std::list;
using std::reference_wrapper;
using std::set;
using std::shared_ptr;
using std::string;
using std::tie;
using std::tuple;
using std::vector;
using process::await;
using process::wait; // Necessary on some OS's to disambiguate.
using process::Clock;
using process::ExitedEvent;
using process::Failure;
using process::Future;
using process::MessageEvent;
using process::Owned;
using process::PID;
using process::Process;
using process::Promise;
using process::RateLimiter;
using process::Shared;
using process::Time;
using process::Timer;
using process::UPID;
using process::http::Pipe;
using process::http::authentication::Principal;
using process::metrics::Counter;
using google::protobuf::RepeatedPtrField;
namespace mesos {
namespace internal {
namespace master {
using mesos::allocator::Allocator;
using mesos::authorization::createSubject;
using mesos::authorization::VIEW_ROLE;
using mesos::authorization::VIEW_FRAMEWORK;
using mesos::authorization::VIEW_TASK;
using mesos::authorization::VIEW_EXECUTOR;
using mesos::master::contender::MasterContender;
using mesos::master::detector::MasterDetector;
using mesos::internal::ResourceQuantities;
static bool isValidFailoverTimeout(const FrameworkInfo& frameworkInfo);
class SlaveObserver : public ProtobufProcess<SlaveObserver>
{
public:
SlaveObserver(const UPID& _slave,
const SlaveInfo& _slaveInfo,
const SlaveID& _slaveId,
const PID<Master>& _master,
const Option<shared_ptr<RateLimiter>>& _limiter,
const shared_ptr<Metrics>& _metrics,
const Duration& _slavePingTimeout,
const size_t _maxSlavePingTimeouts)
: ProcessBase(process::ID::generate("slave-observer")),
slave(_slave),
slaveInfo(_slaveInfo),
slaveId(_slaveId),
master(_master),
limiter(_limiter),
metrics(_metrics),
slavePingTimeout(_slavePingTimeout),
maxSlavePingTimeouts(_maxSlavePingTimeouts),
timeouts(0),
pinged(false),
connected(true)
{
install<PongSlaveMessage>(&SlaveObserver::pong);
}
void reconnect()
{
connected = true;
}
void disconnect()
{
connected = false;
}
protected:
void initialize() override
{
ping();
}
void ping()
{
PingSlaveMessage message;
message.set_connected(connected);
send(slave, message);
pinged = true;
delay(slavePingTimeout, self(), &SlaveObserver::timeout);
}
void pong()
{
timeouts = 0;
pinged = false;
// Cancel any pending unreachable transitions.
if (markingUnreachable.isSome()) {
// Need a copy for non-const access.
Future<Nothing> future = markingUnreachable.get();
future.discard();
}
}
void timeout()
{
if (pinged) {
timeouts++; // No pong has been received before the timeout.
if (timeouts >= maxSlavePingTimeouts) {
// No pong has been received for the last
// 'maxSlavePingTimeouts' pings.
markUnreachable();
}
}
// NOTE: We keep pinging even if we schedule a transition to
// UNREACHABLE. This is because if the slave eventually responds
// to a ping, we can cancel the UNREACHABLE transition.
ping();
}
// Marking slaves unreachable is rate-limited and can be canceled if
// a pong is received before `_markUnreachable` is called.
//
// TODO(neilc): Using a rate-limit when marking slaves unreachable
// is only necessary for frameworks that are not PARTITION_AWARE.
// For such frameworks, we shutdown their tasks when an unreachable
// agent reregisters, so a rate-limit is a useful safety
// precaution. Once all frameworks are PARTITION_AWARE, we can
// likely remove the rate-limit (MESOS-5948).
void markUnreachable()
{
if (markingUnreachable.isSome()) {
return; // Unreachable transition is already in progress.
}
Future<Nothing> acquire = Nothing();
if (limiter.isSome()) {
LOG(INFO) << "Scheduling transition of agent " << slaveId
<< " to UNREACHABLE because of health check timeout";
acquire = limiter.get()->acquire();
}
markingUnreachable = acquire.onAny(defer(self(), &Self::_markUnreachable));
++metrics->slave_unreachable_scheduled;
}
void _markUnreachable()
{
CHECK_SOME(markingUnreachable);
const Future<Nothing>& future = markingUnreachable.get();
CHECK(!future.isFailed());
if (future.isReady()) {
++metrics->slave_unreachable_completed;
dispatch(master,
&Master::markUnreachable,
slaveInfo,
false,
"health check timed out");
} else if (future.isDiscarded()) {
LOG(INFO) << "Canceling transition of agent " << slaveId
<< " to UNREACHABLE because a pong was received!";
++metrics->slave_unreachable_canceled;
}
markingUnreachable = None();
}
private:
const UPID slave;
const SlaveInfo slaveInfo;
const SlaveID slaveId;
const PID<Master> master;
const Option<shared_ptr<RateLimiter>> limiter;
shared_ptr<Metrics> metrics;
Option<Future<Nothing>> markingUnreachable;
const Duration slavePingTimeout;
const size_t maxSlavePingTimeouts;
uint32_t timeouts;
bool pinged;
bool connected;
};
Master::Master(
Allocator* _allocator,
Registrar* _registrar,
Files* _files,
MasterContender* _contender,
MasterDetector* _detector,
const Option<Authorizer*>& _authorizer,
const Option<shared_ptr<RateLimiter>>& _slaveRemovalLimiter,
const Flags& _flags)
: ProcessBase("master"),
flags(_flags),
http(this),
allocator(_allocator),
registrar(_registrar),
files(_files),
contender(_contender),
detector(_detector),
authorizer(_authorizer),
frameworks(flags),
subscribers(this),
authenticator(None()),
metrics(new Metrics(*this)),
electedTime(None())
{
slaves.limiter = _slaveRemovalLimiter;
// NOTE: We populate 'info_' here instead of inside 'initialize()'
// because 'StandaloneMasterDetector' needs access to the info.
// Master ID is generated randomly based on UUID.
info_.set_id(id::UUID::random().toString());
// NOTE: Currently, we store ip in MasterInfo in network order,
// which should be fixed. See MESOS-1201 for details.
// TODO(marco): The ip, port, hostname fields above are
// being deprecated; the code should be removed once
// the deprecation cycle is complete.
info_.set_ip(self().address.ip.in()->s_addr);
info_.set_port(self().address.port);
info_.set_pid(self());
info_.set_version(MESOS_VERSION);
for (const MasterInfo::Capability& capability : MASTER_CAPABILITIES()) {
info_.add_capabilities()->CopyFrom(capability);
}
// Determine our hostname or use the hostname provided.
string hostname;
if (flags.hostname.isNone()) {
if (flags.hostname_lookup) {
Try<string> result = net::getHostname(self().address.ip);
if (result.isError()) {
LOG(FATAL) << "Failed to get hostname: " << result.error();
}
hostname = result.get();
} else {
// We use the IP address for hostname if the user requested us
// NOT to look it up, and it wasn't explicitly set via --hostname:
hostname = stringify(self().address.ip);
}
} else {
hostname = flags.hostname.get();
}
info_.set_hostname(hostname);
// This uses the new `Address` message in `MasterInfo`.
info_.mutable_address()->set_ip(stringify(self().address.ip));
info_.mutable_address()->set_port(self().address.port);
info_.mutable_address()->set_hostname(hostname);
if (flags.domain.isSome()) {
info_.mutable_domain()->CopyFrom(flags.domain.get());
}
}
Master::~Master() {}
hashset<string> Master::misingMinimumCapabilities(
const MasterInfo& masterInfo, const Registry& registry)
{
if (registry.minimum_capabilities().size() == 0) {
return hashset<string>();
}
hashset<string> minimumCapabilities, masterCapabilities;
foreach (
const Registry::MinimumCapability& minimumCapability,
registry.minimum_capabilities()) {
minimumCapabilities.insert(minimumCapability.capability());
}
foreach (
const MasterInfo::Capability& masterCapability,
masterInfo.capabilities()) {
masterCapabilities.insert(
MasterInfo::Capability::Type_Name(masterCapability.type()));
}
return minimumCapabilities - masterCapabilities;
}
// TODO(vinod): Update this interface to return failed futures when
// capacity is reached.
struct BoundedRateLimiter
{
BoundedRateLimiter(double qps, Option<uint64_t> _capacity)
: limiter(new process::RateLimiter(qps)),
capacity(_capacity),
messages(0) {}
process::Owned<process::RateLimiter> limiter;
const Option<uint64_t> capacity;
// Number of outstanding messages for this RateLimiter.
// NOTE: ExitedEvents are throttled but not counted towards
// the capacity here.
uint64_t messages;
};
void Master::initialize()
{
LOG(INFO) << "Master " << info_.id() << " (" << info_.hostname() << ")"
<< " started on " << string(self()).substr(7);
LOG(INFO) << "Flags at startup: " << flags;
if (process::address().ip.isLoopback()) {
LOG(WARNING) << "\n**************************************************\n"
<< "Master bound to loopback interface!"
<< " Cannot communicate with remote schedulers or agents."
<< " You might want to set '--ip' flag to a routable"
<< " IP address.\n"
<< "**************************************************";
}
// NOTE: We enforce a minimum slave reregister timeout because the
// slave bounds its (re-)registration retries based on the minimum.
if (flags.agent_reregister_timeout < MIN_AGENT_REREGISTER_TIMEOUT) {
EXIT(EXIT_FAILURE)
<< "Invalid value '" << flags.agent_reregister_timeout << "'"
<< " for --agent_reregister_timeout:"
<< " Must be at least " << MIN_AGENT_REREGISTER_TIMEOUT;
}
// Parse the percentage for the slave removal limit.
// TODO(bmahler): Add a 'Percentage' abstraction.
if (!strings::endsWith(flags.recovery_agent_removal_limit, "%")) {
EXIT(EXIT_FAILURE)
<< "Invalid value '" << flags.recovery_agent_removal_limit << "'"
<< " for --recovery_agent_removal_percent_limit: " << "missing '%'";
}
Try<double> limit = numify<double>(
strings::remove(
flags.recovery_agent_removal_limit,
"%",
strings::SUFFIX));
if (limit.isError()) {
EXIT(EXIT_FAILURE)
<< "Invalid value '" << flags.recovery_agent_removal_limit << "'"
<< " for --recovery_agent_removal_percent_limit: " << limit.error();
}
if (limit.get() < 0.0 || limit.get() > 100.0) {
EXIT(EXIT_FAILURE)
<< "Invalid value '" << flags.recovery_agent_removal_limit << "'"
<< " for --recovery_agent_removal_percent_limit:"
<< " Must be within [0%-100%]";
}
// Log authentication state.
if (flags.authenticate_frameworks) {
LOG(INFO) << "Master only allowing authenticated frameworks to register";
} else {
LOG(INFO) << "Master allowing unauthenticated frameworks to register";
}
if (flags.authenticate_agents) {
LOG(INFO) << "Master only allowing authenticated agents to register";
} else {
LOG(INFO) << "Master allowing unauthenticated agents to register";
}
if (flags.authenticate_http_frameworks) {
LOG(INFO) << "Master only allowing authenticated HTTP frameworks to "
<< "register";
} else {
LOG(INFO) << "Master allowing HTTP frameworks to register without "
<< "authentication";
}
// Load credentials.
Option<Credentials> credentials;
if (flags.credentials.isSome()) {
Result<Credentials> _credentials =
credentials::read(flags.credentials.get());
if (_credentials.isError()) {
EXIT(EXIT_FAILURE) << _credentials.error() << " (see --credentials flag)";
} else if (_credentials.isNone()) {
EXIT(EXIT_FAILURE)
<< "Credentials file must contain at least one credential"
<< " (see --credentials flag)";
}
// Store credentials in master to use them in routes.
credentials = _credentials.get();
}
// Extract authenticator names and validate them.
authenticatorNames = strings::split(flags.authenticators, ",");
if (authenticatorNames.empty()) {
EXIT(EXIT_FAILURE) << "No authenticator specified";
}
if (authenticatorNames.size() > 1) {
EXIT(EXIT_FAILURE) << "Multiple authenticators not supported";
}
if (authenticatorNames[0] != DEFAULT_AUTHENTICATOR &&
!modules::ModuleManager::contains<Authenticator>(
authenticatorNames[0])) {
EXIT(EXIT_FAILURE)
<< "Authenticator '" << authenticatorNames[0] << "' not found."
<< " Check the spelling (compare to '" << DEFAULT_AUTHENTICATOR << "')"
<< " or verify that the authenticator was loaded successfully"
<< " (see --modules)";
}
// TODO(tillt): Allow multiple authenticators to be loaded and enable
// the authenticatee to select the appropriate one. See MESOS-1939.
if (authenticatorNames[0] == DEFAULT_AUTHENTICATOR) {
LOG(INFO) << "Using default '" << DEFAULT_AUTHENTICATOR
<< "' authenticator";
authenticator = new cram_md5::CRAMMD5Authenticator();
} else {
Try<Authenticator*> module =
modules::ModuleManager::create<Authenticator>(authenticatorNames[0]);
if (module.isError()) {
EXIT(EXIT_FAILURE)
<< "Could not create authenticator module '"
<< authenticatorNames[0] << "': " << module.error();
}
LOG(INFO) << "Using '" << authenticatorNames[0] << "' authenticator";
authenticator = module.get();
}
// Give Authenticator access to credentials when needed.
CHECK_SOME(authenticator);
Try<Nothing> initialize = authenticator.get()->initialize(credentials);
if (initialize.isError()) {
const string error =
"Failed to initialize authenticator '" + authenticatorNames[0] +
"': " + initialize.error();
if (flags.authenticate_frameworks || flags.authenticate_agents) {
EXIT(EXIT_FAILURE)
<< "Failed to start master with authentication enabled: " << error;
} else {
// A failure to initialize the authenticator does lead to
// unusable authentication but still allows non authenticating
// frameworks and slaves to connect.
LOG(WARNING) << "Only non-authenticating frameworks and agents are "
<< "allowed to connect. "
<< "Authentication is disabled: " << error;
delete authenticator.get();
authenticator = None();
}
}
if (flags.authenticate_http_readonly) {
Try<Nothing> result = initializeHttpAuthenticators(
READONLY_HTTP_AUTHENTICATION_REALM,
strings::split(flags.http_authenticators, ","),
credentials);
if (result.isError()) {
EXIT(EXIT_FAILURE) << result.error();
}
}
if (flags.authenticate_http_readwrite) {
Try<Nothing> result = initializeHttpAuthenticators(
READWRITE_HTTP_AUTHENTICATION_REALM,
strings::split(flags.http_authenticators, ","),
credentials);
if (result.isError()) {
EXIT(EXIT_FAILURE) << result.error();
}
}
if (flags.authenticate_http_frameworks) {
// The `--http_framework_authenticators` flag should always be set when HTTP
// framework authentication is enabled.
if (flags.http_framework_authenticators.isNone()) {
EXIT(EXIT_FAILURE)
<< "Missing `--http_framework_authenticators` flag. This must be used "
<< "in conjunction with `--authenticate_http_frameworks`";
}
Try<Nothing> result = initializeHttpAuthenticators(
DEFAULT_HTTP_FRAMEWORK_AUTHENTICATION_REALM,
strings::split(flags.http_framework_authenticators.get(), ","),
credentials);
if (result.isError()) {
EXIT(EXIT_FAILURE) << result.error();
}
}
if (authorizer.isSome()) {
LOG(INFO) << "Authorization enabled";
}
if (flags.rate_limits.isSome()) {
// Add framework rate limiters.
foreach (const RateLimit& limit_, flags.rate_limits->limits()) {
if (frameworks.limiters.contains(limit_.principal())) {
EXIT(EXIT_FAILURE)
<< "Duplicate principal " << limit_.principal()
<< " found in RateLimits configuration";
}
if (limit_.has_qps() && limit_.qps() <= 0) {
EXIT(EXIT_FAILURE)
<< "Invalid qps: " << limit_.qps()
<< ". It must be a positive number";
}
if (limit_.has_qps()) {
Option<uint64_t> capacity;
if (limit_.has_capacity()) {
capacity = limit_.capacity();
}
frameworks.limiters.put(
limit_.principal(),
Owned<BoundedRateLimiter>(
new BoundedRateLimiter(limit_.qps(), capacity)));
} else {
frameworks.limiters.put(limit_.principal(), None());
}
}
if (flags.rate_limits->has_aggregate_default_qps() &&
flags.rate_limits->aggregate_default_qps() <= 0) {
EXIT(EXIT_FAILURE)
<< "Invalid aggregate_default_qps: "
<< flags.rate_limits->aggregate_default_qps()
<< ". It must be a positive number";
}
if (flags.rate_limits->has_aggregate_default_qps()) {
Option<uint64_t> capacity;
if (flags.rate_limits->has_aggregate_default_capacity()) {
capacity = flags.rate_limits->aggregate_default_capacity();
}
frameworks.defaultLimiter =
Owned<BoundedRateLimiter>(new BoundedRateLimiter(
flags.rate_limits->aggregate_default_qps(), capacity));
}
LOG(INFO) << "Framework rate limiting enabled";
}
// If the rate limiter is injected for testing,
// the flag may not be set.
if (slaves.limiter.isSome() && flags.agent_removal_rate_limit.isSome()) {
LOG(INFO) << "Agent removal is rate limited to "
<< flags.agent_removal_rate_limit.get();
}
// If "--roles" is set, configure the role whitelist.
// TODO(neilc): Remove support for explicit roles in ~Mesos 0.32.
if (flags.roles.isSome()) {
LOG(WARNING) << "The '--roles' flag is deprecated. This flag will be "
<< "removed in the future. See the Mesos 0.27 upgrade "
<< "notes for more information";
Try<vector<string>> roles = roles::parse(flags.roles.get());
if (roles.isError()) {
EXIT(EXIT_FAILURE) << "Failed to parse roles: " << roles.error();
}
roleWhitelist = hashset<string>();
foreach (const string& role, roles.get()) {
roleWhitelist->insert(role);
}
if (roleWhitelist->size() < roles->size()) {
LOG(WARNING) << "Duplicate values in '--roles': " << flags.roles.get();
}
// The default role is always allowed.
roleWhitelist->insert("*");
}
// Add role weights.
if (flags.weights.isSome()) {
vector<string> tokens = strings::tokenize(flags.weights.get(), ",");
foreach (const string& token, tokens) {
vector<string> pair = strings::tokenize(token, "=");
if (pair.size() != 2) {
EXIT(EXIT_FAILURE)
<< "Invalid weight: '" << token << "'. --weights should"
<< " be of the form 'role=weight,role=weight'";
} else if (!isWhitelistedRole(pair[0])) {
EXIT(EXIT_FAILURE)
<< "Invalid weight: '" << token << "'. " << pair[0]
<< " is not a valid role";
}
double weight = atof(pair[1].c_str());
if (weight <= 0) {
EXIT(EXIT_FAILURE)
<< "Invalid weight: '" << token << "'. Weights must be positive";
}
weights[pair[0]] = weight;
}
}
// Verify the timeout is greater than zero.
if (flags.offer_timeout.isSome() &&
flags.offer_timeout.get() <= Duration::zero()) {
EXIT(EXIT_FAILURE)
<< "Invalid value '" << flags.offer_timeout.get() << "'"
<< " for --offer_timeout: Must be greater than zero";
}
// Parse min_allocatable_resources.
vector<ResourceQuantities> minAllocatableResources;
foreach (
const string& token,
strings::tokenize(flags.min_allocatable_resources, "|")) {
Try<ResourceQuantities> resourceQuantities =
ResourceQuantities::fromString(token);
if (resourceQuantities.isError()) {
EXIT(EXIT_FAILURE) << "Error parsing min_allocatable_resources '"
<< flags.min_allocatable_resources
<< "': " << resourceQuantities.error();
}
// We check the configuration against first-class resources and warn
// against possible mis-configuration (e.g. typo).
set<string> firstClassResources = {"cpus", "mem", "disk", "ports", "gpus"};
for (auto it = resourceQuantities->begin(); it != resourceQuantities->end();
++it) {
if (firstClassResources.count(it->first) == 0) {
LOG(WARNING) << "Non-first-class resource '" << it->first
<< "' is configured as part of min_allocatable_resources";
}
}
minAllocatableResources.push_back(resourceQuantities.get());
}
// Initialize the allocator.
allocator->initialize(
flags.allocation_interval,
defer(self(), &Master::offer, lambda::_1, lambda::_2),
defer(self(), &Master::inverseOffer, lambda::_1, lambda::_2),
flags.fair_sharing_excluded_resource_names,
flags.filter_gpu_resources,
flags.domain,
minAllocatableResources,
flags.max_completed_frameworks);
// Parse the whitelist. Passing Allocator::updateWhitelist()
// callback is safe because we shut down the whitelistWatcher in
// Master::finalize(), while allocator lifetime is greater than
// masters. Therefore there is no risk of calling into an allocator
// that has been cleaned up.
whitelistWatcher = new WhitelistWatcher(
flags.whitelist,
WHITELIST_WATCH_INTERVAL,
[this](const Option<hashset<string>>& whitelist) {
return allocator->updateWhitelist(whitelist);
});
spawn(whitelistWatcher);
nextFrameworkId = 0;
nextSlaveId = 0;
nextOfferId = 0;
startTime = Clock::now();
install<scheduler::Call>(&Master::receive);
// Install handler functions for certain messages.
install<SubmitSchedulerRequest>(
&Master::submitScheduler,
&SubmitSchedulerRequest::name);
install<RegisterFrameworkMessage>(
&Master::registerFramework);
install<ReregisterFrameworkMessage>(
&Master::reregisterFramework);
install<UnregisterFrameworkMessage>(
&Master::unregisterFramework,
&UnregisterFrameworkMessage::framework_id);
install<DeactivateFrameworkMessage>(
&Master::deactivateFramework,
&DeactivateFrameworkMessage::framework_id);
install<ResourceRequestMessage>(
&Master::resourceRequest,
&ResourceRequestMessage::framework_id,
&ResourceRequestMessage::requests);
install<LaunchTasksMessage>(
&Master::launchTasks);
install<ReviveOffersMessage>(
&Master::reviveOffers,
&ReviveOffersMessage::framework_id,
&ReviveOffersMessage::roles);
install<KillTaskMessage>(
&Master::killTask,
&KillTaskMessage::framework_id,
&KillTaskMessage::task_id);
install<StatusUpdateAcknowledgementMessage>(
&Master::statusUpdateAcknowledgement);
install<FrameworkToExecutorMessage>(
&Master::schedulerMessage);
install<RegisterSlaveMessage>(
&Master::registerSlave);
install<ReregisterSlaveMessage>(
&Master::reregisterSlave);
install<UnregisterSlaveMessage>(
&Master::unregisterSlave,
&UnregisterSlaveMessage::slave_id);
install<StatusUpdateMessage>(
&Master::statusUpdate);
// Added in 0.24.0 to support HTTP schedulers. Since
// these do not have a pid, the slave must forward
// messages through the master.
install<ExecutorToFrameworkMessage>(
&Master::executorMessage);
install<ReconcileTasksMessage>(
&Master::reconcileTasks);
install<UpdateOperationStatusMessage>(
&Master::updateOperationStatus);
install<ExitedExecutorMessage>(
&Master::exitedExecutor,
&ExitedExecutorMessage::slave_id,
&ExitedExecutorMessage::framework_id,
&ExitedExecutorMessage::executor_id,
&ExitedExecutorMessage::status);
install<UpdateSlaveMessage>(&Master::updateSlave);
install<AuthenticateMessage>(
&Master::authenticate,
&AuthenticateMessage::pid);
// Setup HTTP routes.
route("/api/v1",
// TODO(benh): Is this authentication realm sufficient or do
// we need some kind of hybrid if we expect both schedulers
// and operators/tooling to use this endpoint?
READWRITE_HTTP_AUTHENTICATION_REALM,
Http::API_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.api(request, principal);
});
route("/api/v1/scheduler",
DEFAULT_HTTP_FRAMEWORK_AUTHENTICATION_REALM,
Http::SCHEDULER_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.scheduler(request, principal);
});
route("/create-volumes",
READWRITE_HTTP_AUTHENTICATION_REALM,
Http::CREATE_VOLUMES_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.createVolumes(request, principal);
});
route("/destroy-volumes",
READWRITE_HTTP_AUTHENTICATION_REALM,
Http::DESTROY_VOLUMES_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.destroyVolumes(request, principal);
});
route("/frameworks",
READONLY_HTTP_AUTHENTICATION_REALM,
Http::FRAMEWORKS_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.frameworks(request, principal);
});
route("/flags",
READONLY_HTTP_AUTHENTICATION_REALM,
Http::FLAGS_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.flags(request, principal);
});
route("/health",
Http::HEALTH_HELP(),
[this](const process::http::Request& request) {
return http.health(request);
});
route("/redirect",
Http::REDIRECT_HELP(),
[this](const process::http::Request& request) {
return http.redirect(request);
});
route("/reserve",
READWRITE_HTTP_AUTHENTICATION_REALM,
Http::RESERVE_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.reserve(request, principal);
});
// TODO(ijimenez): Remove this endpoint at the end of the
// deprecation cycle on 0.26.
route("/roles.json",
READONLY_HTTP_AUTHENTICATION_REALM,
Http::ROLES_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.roles(request, principal);
});
route("/roles",
READONLY_HTTP_AUTHENTICATION_REALM,
Http::ROLES_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.roles(request, principal);
});
route("/teardown",
READWRITE_HTTP_AUTHENTICATION_REALM,
Http::TEARDOWN_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.teardown(request, principal);
});
route("/slaves",
READONLY_HTTP_AUTHENTICATION_REALM,
Http::SLAVES_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.slaves(request, principal);
});
// TODO(ijimenez): Remove this endpoint at the end of the
// deprecation cycle on 0.26.
route("/state.json",
READONLY_HTTP_AUTHENTICATION_REALM,
Http::STATE_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.state(request, principal);
});
route("/state",
READONLY_HTTP_AUTHENTICATION_REALM,
Http::STATE_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.state(request, principal);
});
route("/state-summary",
READONLY_HTTP_AUTHENTICATION_REALM,
Http::STATESUMMARY_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.stateSummary(request, principal);
});
// TODO(ijimenez): Remove this endpoint at the end of the
// deprecation cycle.
route("/tasks.json",
READONLY_HTTP_AUTHENTICATION_REALM,
Http::TASKS_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.tasks(request, principal);
});
route("/tasks",
READONLY_HTTP_AUTHENTICATION_REALM,
Http::TASKS_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.tasks(request, principal);
});
route("/maintenance/schedule",
READWRITE_HTTP_AUTHENTICATION_REALM,
Http::MAINTENANCE_SCHEDULE_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.maintenanceSchedule(request, principal);
});
route("/maintenance/status",
READONLY_HTTP_AUTHENTICATION_REALM,
Http::MAINTENANCE_STATUS_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.maintenanceStatus(request, principal);
});
route("/machine/down",
READWRITE_HTTP_AUTHENTICATION_REALM,
Http::MACHINE_DOWN_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.machineDown(request, principal);
});
route("/machine/up",
READWRITE_HTTP_AUTHENTICATION_REALM,
Http::MACHINE_UP_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.machineUp(request, principal);
});
route("/unreserve",
READWRITE_HTTP_AUTHENTICATION_REALM,
Http::UNRESERVE_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.unreserve(request, principal);
});
route("/quota",
READWRITE_HTTP_AUTHENTICATION_REALM,
Http::QUOTA_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.quota(request, principal);
});
route("/weights",
READWRITE_HTTP_AUTHENTICATION_REALM,
Http::WEIGHTS_HELP(),
[this](const process::http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.weights(request, principal);
});
// Provide HTTP assets from a "webui" directory. This is either
// specified via flags (which is necessary for running out of the
// build directory before 'make install') or determined at build
// time via the preprocessor macro '-DMESOS_WEBUI_DIR' set in the
// Makefile.
provide("", path::join(flags.webui_dir, "index.html"));
provide("app", path::join(flags.webui_dir, "app"));
provide("assets", path::join(flags.webui_dir, "assets"));
const PID<Master> masterPid = self();
auto authorize = [masterPid](const Option<Principal>& principal) {
return dispatch(masterPid, &Master::authorizeLogAccess, principal);
};
// Expose the log file for the webui. Fall back to 'log_dir' if
// an explicit file was not specified.
if (flags.external_log_file.isSome()) {
files->attach(flags.external_log_file.get(), "/master/log", authorize)
.onAny(defer(self(),
&Self::fileAttached,
lambda::_1,
flags.external_log_file.get()));
} else if (flags.log_dir.isSome()) {
Try<string> log = logging::getLogFile(
logging::getLogSeverity(flags.logging_level));
if (log.isError()) {
LOG(ERROR) << "Master log file cannot be found: " << log.error();
} else {
files->attach(log.get(), "/master/log", authorize)
.onAny(defer(self(), &Self::fileAttached, lambda::_1, log.get()));
}
}
contender->initialize(info_);
// Start contending to be a leading master and detecting the current
// leader.
contender->contend()
.onAny(defer(self(), &Master::contended, lambda::_1));
detector->detect()
.onAny(defer(self(), &Master::detected, lambda::_1));
}
void Master::finalize()
{
LOG(INFO) << "Master terminating";
// NOTE: Even though we remove the slave and framework from the
// allocator, it is possible that offers are already dispatched to
// this master. In tests, if a new master (with the same PID) is
// started, it might process the offers from the old master's
// allocator.
// TODO(vinod): Fix the above race by changing the allocator
// interface to return a stream of offer events.
// Remove the slaves.
foreachvalue (Slave* slave, slaves.registered) {
// We first remove the slave from the allocator so that any
// recovered resources below are not reoffered.
allocator->removeSlave(slave->id);
foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
removeTask(task);
}
}
// Remove executors.
foreachkey (const FrameworkID& frameworkId, utils::copy(slave->executors)) {
foreachkey (const ExecutorID& executorId,
utils::copy(slave->executors[frameworkId])) {
removeExecutor(slave, frameworkId, executorId);
}
}
// Remove offers.
foreach (Offer* offer, utils::copy(slave->offers)) {
removeOffer(offer);
}
// Remove inverse offers.
foreach (InverseOffer* inverseOffer, utils::copy(slave->inverseOffers)) {
// We don't need to update the allocator because the slave has already
// been removed.
removeInverseOffer(inverseOffer);
}
// Remove pending tasks from the slave. Don't bother
// recovering the resources in the allocator.
slave->pendingTasks.clear();
// Terminate the slave observer.
terminate(slave->observer);
wait(slave->observer);
delete slave->observer;
delete slave;
}
slaves.registered.clear();
// Remove the frameworks.
// Note we are not deleting the pointers to the frameworks from the
// roles because it is unnecessary bookkeeping at this point since
// we are shutting down.
foreachvalue (Framework* framework, frameworks.registered) {
allocator->removeFramework(framework->id());
// Remove pending tasks from the framework. Don't bother
// recovering the resources in the allocator.
framework->pendingTasks.clear();
// No tasks/executors/offers should remain since the slaves
// have been removed.
CHECK(framework->tasks.empty());
CHECK(framework->executors.empty());
CHECK(framework->offers.empty());
CHECK(framework->inverseOffers.empty());
delete framework;
}
frameworks.registered.clear();
CHECK(offers.empty());
CHECK(inverseOffers.empty());
foreachvalue (Future<Option<string>> future, authenticating) {
// NOTE: This is necessary during tests because a copy of
// this future is used to setup authentication timeout. If a
// test doesn't discard this future, authentication timeout might
// fire in a different test and any associated callbacks
// (e.g., '_authenticate()') would be called. This is because the
// master pid doesn't change across the tests.
// TODO(vinod): This seems to be a bug in libprocess or the
// testing infrastructure.
future.discard();
}
foreachvalue (Role* role, roles) {
delete role;
}
roles.clear();
// NOTE: This is necessary during tests because we don't want the
// timer to fire in a different test and invoke the callback.
// The callback would be invoked because the master pid doesn't
// change across the tests.
// TODO(vinod): This seems to be a bug in libprocess or the
// testing infrastructure.
if (slaves.recoveredTimer.isSome()) {
Clock::cancel(slaves.recoveredTimer.get());
}
if (registryGcTimer.isSome()) {
Clock::cancel(registryGcTimer.get());
}
terminate(whitelistWatcher);
wait(whitelistWatcher);
delete whitelistWatcher;
if (authenticator.isSome()) {
delete authenticator.get();
}
}
void Master::exited(const FrameworkID& frameworkId, const HttpConnection& http)
{
foreachvalue (Framework* framework, frameworks.registered) {
if (framework->http.isSome() && framework->http->writer == http.writer) {
CHECK_EQ(frameworkId, framework->id());
_exited(framework);
return;
}
// If the framework has reconnected, the writer will not match
// above, and we will have a framework with a matching id.
if (frameworkId == framework->id()) {
LOG(INFO) << "Ignoring disconnection for framework "
<< *framework << " as it has already reconnected";
return;
}
}
}
void Master::exited(const UPID& pid)
{
foreachvalue (Framework* framework, frameworks.registered) {
if (framework->pid == pid) {
// See comments in `receive()` on why we send an error message
// to the framework upon detecting a disconnection.
FrameworkErrorMessage message;
message.set_message("Framework disconnected");
framework->send(message);
_exited(framework);
return;
}
}
if (Slave* slave = slaves.registered.get(pid)) {
LOG(INFO) << "Agent " << *slave << " disconnected";
if (slave->connected) {
disconnect(slave);
// The semantics when a registered slave gets disconnected are as
// follows for each framework running on that slave:
//
// 1) If the framework is checkpointing: No immediate action is
// taken. The slave is given a chance to reconnect until the
// slave observer times out (75s) and removes the slave.
//
// 2) If the framework is not-checkpointing: The slave is not
// removed but the framework is removed from the slave's
// structs, its tasks transitioned to LOST and resources
// recovered.
hashset<FrameworkID> frameworkIds =
slave->tasks.keys() | slave->executors.keys();
foreach (const FrameworkID& frameworkId, frameworkIds) {
Framework* framework = getFramework(frameworkId);
CHECK_NOTNULL(framework);
if (!framework->info.checkpoint()) {
LOG(INFO) << "Removing framework " << *framework
<< " from disconnected agent " << *slave
<< " because the framework is not checkpointing";
removeFramework(slave, framework);
}
}
// If the master -> agent socket breaks, we expect that either
// (a) the agent will fail to respond to pings and be marked
// unreachable, or (b) the agent will receive a ping, notice the
// master thinks it is disconnected, and then reregister. There
// is a third possibility: if the agent restarts but hangs
// during agent recovery, it will respond to pings but never
// attempt to reregister (MESOS-6286).
//
// To handle this case, we expect that an agent whose socket has
// broken will reregister within `agent_reregister_timeout`. If
// the agent doesn't reregister, it is marked unreachable.
slave->reregistrationTimer =
delay(flags.agent_reregister_timeout,
self(),
&Master::agentReregisterTimeout,
slave->id);
} else {
// NOTE: A duplicate exited() event is possible for a slave
// because its PID doesn't change on restart. See MESOS-675
// for details.
LOG(WARNING) << "Ignoring duplicate exited() notification for "
<< "agent " << *slave;
}
}
}
void Master::agentReregisterTimeout(const SlaveID& slaveId)
{
Slave* slave = slaves.registered.get(slaveId);
// The slave might have been removed or reregistered concurrently
// with the timeout expiring.
if (slave == nullptr || slave->connected) {
return;
}
// Remove the slave in a rate limited manner, similar to how the
// SlaveObserver removes slaves.
Future<Nothing> acquire = Nothing();
if (slaves.limiter.isSome()) {
LOG(INFO) << "Scheduling removal of agent "
<< *slave
<< "; did not reregister within "
<< flags.agent_reregister_timeout << " after disconnecting";
acquire = slaves.limiter.get()->acquire();
}
acquire
.then(defer(self(), &Self::_agentReregisterTimeout, slaveId));
++metrics->slave_unreachable_scheduled;
}
Nothing Master::_agentReregisterTimeout(const SlaveID& slaveId)
{
Slave* slave = slaves.registered.get(slaveId);
// The slave might have been removed or reregistered while we were
// waiting to acquire the rate limit.
if (slave == nullptr || slave->connected) {
++metrics->slave_unreachable_canceled;
return Nothing();
}
++metrics->slave_unreachable_completed;
markUnreachable(
slave->info,
false,
"agent did not reregister within " +
stringify(flags.agent_reregister_timeout) +
" after disconnecting");
return Nothing();
}
void Master::_exited(Framework* framework)
{
LOG(INFO) << "Framework " << *framework << " disconnected";
// Disconnect the framework.
if (framework->connected()) {
disconnect(framework);
}
// We can assume framework's failover_timeout is valid
// because it has been validated in framework subscription.
Try<Duration> failoverTimeout_ =
Duration::create(framework->info.failover_timeout());
CHECK_SOME(failoverTimeout_);
Duration failoverTimeout = failoverTimeout_.get();
LOG(INFO) << "Giving framework " << *framework << " "
<< failoverTimeout << " to failover";
// Delay dispatching a message to ourselves for the timeout.
delay(failoverTimeout,
self(),
&Master::frameworkFailoverTimeout,
framework->id(),
framework->reregisteredTime);
}
Future<bool> Master::authorizeLogAccess(const Option<Principal>& principal)
{
if (authorizer.isNone()) {
return true;
}
authorization::Request request;
request.set_action(authorization::ACCESS_MESOS_LOG);
Option<authorization::Subject> subject = createSubject(principal);
if (subject.isSome()) {
request.mutable_subject()->CopyFrom(subject.get());
}
return authorizer.get()->authorized(request);
}
void Master::consume(MessageEvent&& event)
{
// There are three cases about the message's UPID with respect to
// 'frameworks.principals':
// 1) if a <UPID, principal> pair exists and the principal is Some,
// it's a framework with its principal specified.
// 2) if a <UPID, principal> pair exists and the principal is None,
// it's a framework without a principal.
// 3) if a <UPID, principal> pair does not exist in the map, it's
// either an unregistered framework or not a framework.
// The logic for framework message counters and rate limiting
// mainly concerns with whether the UPID is a *registered*
// framework and whether the framework has a principal so we use
// these two temp variables to simplify the condition checks below.
bool isRegisteredFramework =
frameworks.principals.contains(event.message.from);
const Option<string> principal = isRegisteredFramework
? frameworks.principals[event.message.from]
: Option<string>::none();
// Increment the "message_received" counter if the message is from
// a framework and such a counter is configured for it.
// See comments for 'Master::Metrics::Frameworks' and
// 'Master::Frameworks::principals' for details.
if (principal.isSome()) {
// If the framework has a principal, the counter must exist.
CHECK(metrics->frameworks.contains(principal.get()));
Counter messages_received =
metrics->frameworks.get(principal.get()).get()->messages_received;
++messages_received;
}
// All messages are filtered when non-leading.
if (!elected()) {
VLOG(1) << "Dropping '" << event.message.name << "' message since "
<< "not elected yet";
++metrics->dropped_messages;
return;
}
CHECK_SOME(recovered);
// All messages are filtered while recovering.
// TODO(bmahler): Consider instead re-enqueing *all* messages
// through recover(). What are the performance implications of
// the additional queueing delay and the accumulated backlog
// of messages post-recovery?
if (!recovered->isReady()) {
VLOG(1) << "Dropping '" << event.message.name << "' message since "
<< "not recovered yet";
++metrics->dropped_messages;
return;
}
// Throttle the message if it's a framework message and a
// RateLimiter is configured for the framework's principal.
// The framework is throttled by the default RateLimiter if:
// 1) the default RateLimiter is configured (and)
// 2) the framework doesn't have a principal or its principal is
// not specified in 'flags.rate_limits'.
// The framework is not throttled if:
// 1) the default RateLimiter is not configured to handle case 2)
// above. (or)
// 2) the principal exists in RateLimits but 'qps' is not set.
if (principal.isSome() &&
frameworks.limiters.contains(principal.get()) &&
frameworks.limiters[principal.get()].isSome()) {
const Owned<BoundedRateLimiter>& limiter =
frameworks.limiters[principal.get()].get();
if (limiter->capacity.isNone() ||
limiter->messages < limiter->capacity.get()) {
limiter->messages++;
limiter->limiter->acquire()
.onReady(defer(self(), &Self::throttled, std::move(event), principal));
} else {
exceededCapacity(
event,
principal,
limiter->capacity.get());
}
} else if ((principal.isNone() ||
!frameworks.limiters.contains(principal.get())) &&
isRegisteredFramework &&
frameworks.defaultLimiter.isSome()) {
if (frameworks.defaultLimiter.get()->capacity.isNone() ||
frameworks.defaultLimiter.get()->messages <
frameworks.defaultLimiter.get()->capacity.get()) {
frameworks.defaultLimiter.get()->messages++;
frameworks.defaultLimiter.get()->limiter->acquire()
.onReady(defer(self(), &Self::throttled, std::move(event), None()));
} else {
exceededCapacity(
event,
principal,
frameworks.defaultLimiter.get()->capacity.get());
}
} else {
_consume(std::move(event));
}
}
void Master::consume(ExitedEvent&& event)
{
// See comments in 'consume(MessageEvent&& event)' for which
// RateLimiter is used to throttle this UPID and when it is not
// throttled.
// Note that throttling ExitedEvent is necessary so the order
// between MessageEvents and ExitedEvents from the same PID is
// maintained. Also ExitedEvents are not subject to the capacity.
bool isRegisteredFramework = frameworks.principals.contains(event.pid);
const Option<string> principal = isRegisteredFramework
? frameworks.principals[event.pid]
: Option<string>::none();
// Necessary to disambiguate below.
typedef void(Self::*F)(ExitedEvent&&);
if (principal.isSome() &&
frameworks.limiters.contains(principal.get()) &&
frameworks.limiters[principal.get()].isSome()) {
frameworks.limiters[principal.get()].get()->limiter->acquire().onReady(
defer(self(), static_cast<F>(&Self::_consume), std::move(event)));
} else if ((principal.isNone() ||
!frameworks.limiters.contains(principal.get())) &&
isRegisteredFramework &&
frameworks.defaultLimiter.isSome()) {
frameworks.defaultLimiter.get()->limiter->acquire().onReady(
defer(self(), static_cast<F>(&Self::_consume), std::move(event)));
} else {
_consume(std::move(event));
}
}
// TODO(greggomann): Change this to accept an `Option<Principal>`
// when MESOS-7202 is resolved.
void Master::throttled(
MessageEvent&& event,
const Option<string>& principal)
{
// We already know a RateLimiter is used to throttle this event so
// here we only need to determine which.
if (principal.isSome()) {
CHECK_SOME(frameworks.limiters[principal.get()]);
frameworks.limiters[principal.get()].get()->messages--;
} else {
CHECK_SOME(frameworks.defaultLimiter);
frameworks.defaultLimiter.get()->messages--;
}
_consume(std::move(event));
}
void Master::_consume(MessageEvent&& event)
{
// Obtain the principal before processing the Message because the
// mapping may be deleted in handling 'UnregisterFrameworkMessage'
// but its counter still needs to be incremented for this message.
const Option<string> principal =
frameworks.principals.contains(event.message.from)
? frameworks.principals[event.message.from]
: Option<string>::none();
ProtobufProcess<Master>::consume(std::move(event));
// Increment 'messages_processed' counter if it still exists.
// Note that it could be removed in handling
// 'UnregisterFrameworkMessage' if it's the last framework with
// this principal.
if (principal.isSome() && metrics->frameworks.contains(principal.get())) {
Counter messages_processed =
metrics->frameworks.get(principal.get()).get()->messages_processed;
++messages_processed;
}
}
// TODO(greggomann): Change this to accept an `Option<Principal>`
// when MESOS-7202 is resolved.
void Master::exceededCapacity(
const MessageEvent& event,
const Option<string>& principal,
uint64_t capacity)
{
LOG(WARNING) << "Dropping message " << event.message.name << " from "
<< event.message.from
<< (principal.isSome() ? "(" + principal.get() + ")" : "")
<< ": capacity(" << capacity << ") exceeded";
// Send an error to the framework which will abort the scheduler
// driver.
// NOTE: The scheduler driver will send back a
// DeactivateFrameworkMessage which may be dropped as well but this
// should be fine because the scheduler is already informed of an
// unrecoverable error and should take action to recover.
FrameworkErrorMessage message;
message.set_message(
"Message " + event.message.name +
" dropped: capacity(" + stringify(capacity) + ") exceeded");
send(event.message.from, message);
}
void Master::_consume(ExitedEvent&& event)
{
Process<Master>::consume(std::move(event));
}
void fail(const string& message, const string& failure)
{
LOG(FATAL) << message << ": " << failure;
}
Future<Nothing> Master::recover()
{
if (!elected()) {
return Failure("Not elected as leading master");
}
if (recovered.isNone()) {
LOG(INFO) << "Recovering from registrar";
recovered = registrar->recover(info_)
.then(defer(self(), &Self::_recover, lambda::_1));
}
return recovered.get();
}
Future<Nothing> Master::_recover(const Registry& registry)
{
hashset<string> missingCapabilities =
misingMinimumCapabilities(info_, registry);
if (!missingCapabilities.empty()) {
LOG(ERROR) << "Master is missing the following minimum capabilities: "
<< strings::join<hashset<string>>(", ", missingCapabilities)
<< ". See the following documentation for steps to safely "
<< "recover from this state: "
<< "http://mesos.apache.org/documentation/latest/downgrades";
EXIT(EXIT_FAILURE);
}
foreach (const Registry::Slave& slave, registry.slaves().slaves()) {
SlaveInfo slaveInfo = slave.info();
// We store the `SlaveInfo`'s resources in the `pre-reservation-refinement`
// in order to support downgrades. We convert them back to `post-` format
// here so that we can keep our invariant of working with `post-` format
// resources within master memory.
upgradeResources(&slaveInfo);
slaves.recovered.put(slaveInfo.id(), slaveInfo);
}
foreach (const Registry::UnreachableSlave& unreachable,
registry.unreachable().slaves()) {
CHECK(!slaves.unreachable.contains(unreachable.id()));
slaves.unreachable[unreachable.id()] = unreachable.timestamp();
}
foreach (const Registry::GoneSlave& gone,
registry.gone().slaves()) {
slaves.gone[gone.id()] = gone.timestamp();
}
// Set up a timer for age-based registry GC.
scheduleRegistryGc();
// Set up a timeout for slaves to reregister.
slaves.recoveredTimer =
delay(flags.agent_reregister_timeout,
self(),
&Self::recoveredSlavesTimeout,
registry);
// Save the maintenance schedule.
foreach (const mesos::maintenance::Schedule& schedule, registry.schedules()) {
maintenance.schedules.push_back(schedule);
}
// Save the machine info for each machine.
foreach (const Registry::Machine& machine, registry.machines().machines()) {
machines[machine.info().id()] = Machine(machine.info());
}
// Save the quotas for each role.
foreach (const Registry::Quota& quota, registry.quotas()) {
quotas[quota.info().role()] = Quota{quota.info()};
}
// We notify the allocator via the `recover()` call. This has to be
// done before the first agent reregisters and makes its resources
// available for allocation. This is necessary because at this point
// the allocator is already initialized and ready to perform
// allocations. An allocator may decide to hold off with allocation
// until after it restores a view of the cluster state.
int expectedAgentCount = registry.slaves().slaves().size();
allocator->recover(expectedAgentCount, quotas);
// TODO(alexr): Consider adding a sanity check: whether quotas are
// satisfiable given all recovering agents reregister. We may want
// to notify operators early if total quota cannot be met.
// Recover weights, and update the allocator accordingly. If we
// recovered weights from the registry, any weights specified on the
// command-line are ignored. If no weights were recovered from the
// registry, any weights specified on the command-line are used and
// then stored in the registry.
vector<WeightInfo> weightInfos;
if (registry.weights_size() != 0) {
// TODO(Yongqiao Wang): After the Mesos master quorum is achieved,
// operator can send an update weights request to do a batch
// configuration for weights, so the `--weights` flag can be
// deprecated and this check can eventually be removed.
if (!weights.empty()) {
LOG(WARNING) << "Ignoring --weights flag '" << flags.weights.get()
<< "' and recovering the weights from registry";
weights.clear();
}
foreach (const Registry::Weight& weight, registry.weights()) {
WeightInfo weightInfo;
weightInfo.set_role(weight.info().role());
weightInfo.set_weight(weight.info().weight());
weightInfos.push_back(weightInfo);
weights[weight.info().role()] = weight.info().weight();
}
} else if (!weights.empty()) {
foreachpair (const string& role, double weight, weights) {
WeightInfo weightInfo;
weightInfo.set_role(role);
weightInfo.set_weight(weight);
weightInfos.push_back(weightInfo);
}
registrar->apply(Owned<RegistryOperation>(
new weights::UpdateWeights(weightInfos)));
}
allocator->updateWeights(weightInfos);
// Recovery is now complete!
LOG(INFO) << "Recovered " << registry.slaves().slaves().size() << " agents"
<< " from the registry (" << Bytes(registry.ByteSize()) << ")"
<< "; allowing " << flags.agent_reregister_timeout
<< " for agents to reregister";
return Nothing();
}
void Master::scheduleRegistryGc()
{
registryGcTimer = delay(flags.registry_gc_interval,
self(),
&Self::doRegistryGc);
}
void Master::doRegistryGc()
{
// Schedule next periodic GC.
scheduleRegistryGc();
// Determine which unreachable agents to GC from the registry, if
// any. We do this by examining the master's in-memory copy of the
// unreachable list and checking two criteria, "age" and "count". To
// check the "count" criteria, we remove elements from the beginning
// of the list until it contains at most "registry_max_agent_count"
// elements (note that `slaves.unreachable` is a `LinkedHashMap`,
// which provides iteration over keys in insertion-order). To check
// the "age" criteria, we remove any element in the list whose age
// is more than "registry_max_agent_age". Note that for the latter,
// we check the entire list, not just the beginning: this avoids
// requiring that the list be kept sorted by timestamp.
//
// We build a candidate list of SlaveIDs to remove. We then try to
// remove this list from the registry. Note that all the slaveIDs we
// want to remove might not be found in the registrar's copy of the
// unreachable list; this can occur if there is a concurrent write
// (e.g., an unreachable agent we want to GC reregisters
// concurrently). In this situation, we skip removing any elements
// we don't find.
auto prune = [this](const LinkedHashMap<SlaveID, TimeInfo>& slaves) {
size_t count = slaves.size();
TimeInfo currentTime = protobuf::getCurrentTime();
hashset<SlaveID> toRemove;
foreachpair (const SlaveID& slaveId,
const TimeInfo& removalTime,
slaves) {
// Count-based GC.
CHECK(toRemove.size() <= count);
size_t liveCount = count - toRemove.size();
if (liveCount > flags.registry_max_agent_count) {
toRemove.insert(slaveId);
continue;
}
// Age-based GC.
Duration age = Nanoseconds(
currentTime.nanoseconds() - removalTime.nanoseconds());
if (age > flags.registry_max_agent_age) {
toRemove.insert(slaveId);
}
}
return toRemove;
};
hashset<SlaveID> toRemoveUnreachable = prune(slaves.unreachable);
hashset<SlaveID> toRemoveGone = prune(slaves.gone);
if (toRemoveUnreachable.empty() && toRemoveGone.empty()) {
VLOG(1) << "Skipping periodic registry garbage collection: "
<< "no agents qualify for removal";
return;
}
VLOG(1) << "Attempting to remove " << toRemoveUnreachable.size()
<< " unreachable and " << toRemoveGone.size()
<< " gone agents from the registry";
registrar->apply(Owned<RegistryOperation>(
new Prune(toRemoveUnreachable, toRemoveGone)))
.onAny(defer(self(),
&Self::_doRegistryGc,
toRemoveUnreachable,
toRemoveGone,
lambda::_1));
}
void Master::_doRegistryGc(
const hashset<SlaveID>& toRemoveUnreachable,
const hashset<SlaveID>& toRemoveGone,
const Future<bool>& registrarResult)
{
CHECK(!registrarResult.isDiscarded());
CHECK(!registrarResult.isFailed());
// `Prune` registry operation should never fail.
CHECK(registrarResult.get());
// Update in-memory state to be consistent with registry changes. If
// there was a concurrent registry operation that also modified the
// unreachable/gone list (e.g., an agent in `toRemoveXXX` concurrently
// reregistered), entries in `toRemove` might not appear in
// `slaves.unreachable` or `slaves.gone`.
//
// TODO(neilc): It would be nice to verify that the effect of these
// in-memory updates is equivalent to the changes made by the registry
// operation, but there isn't an easy way to do that.
size_t numRemovedUnreachable = 0;
foreach (const SlaveID& slaveId, toRemoveUnreachable) {
if (!slaves.unreachable.contains(slaveId)) {
LOG(WARNING) << "Failed to garbage collect " << slaveId
<< " from the unreachable list";
continue;
}
slaves.unreachable.erase(slaveId);
// TODO(vinod): Consider moving these tasks into `completedTasks` by
// transitioning them to a terminal state and sending status updates.
// But it's not clear what this state should be. If a framework
// reconciles these tasks after this point it would get `TASK_UNKNOWN`
// which seems appropriate but we don't keep tasks in this state in-memory.
if (slaves.unreachableTasks.contains(slaveId)) {
foreachkey (const FrameworkID& frameworkId,
slaves.unreachableTasks.at(slaveId)) {
Framework* framework = getFramework(frameworkId);
if (framework != nullptr) {
foreach (const TaskID& taskId,
slaves.unreachableTasks.at(slaveId).at(frameworkId)) {
framework->unreachableTasks.erase(taskId);
}
}
}
}
slaves.unreachableTasks.erase(slaveId);
numRemovedUnreachable++;
}
size_t numRemovedGone = 0;
foreach (const SlaveID& slaveId, toRemoveGone) {
if (!slaves.gone.contains(slaveId)) {
LOG(WARNING) << "Failed to garbage collect " << slaveId
<< " from the gone list";
continue;
}
slaves.gone.erase(slaveId);
numRemovedGone++;
}
// TODO(neilc): Add a metric for # of agents discarded from the registry?
LOG(INFO) << "Garbage collected " << numRemovedUnreachable
<< " unreachable and " << numRemovedGone
<< " gone agents from the registry";
}
void Master::recoveredSlavesTimeout(const Registry& registry)
{
CHECK(elected());
// TODO(bmahler): Add a 'Percentage' abstraction.
Try<double> limit_ = numify<double>(
strings::remove(
flags.recovery_agent_removal_limit,
"%",
strings::SUFFIX));
CHECK_SOME(limit_);
double limit = limit_.get() / 100.0;
// Compute the percentage of slaves to be removed, if it exceeds the
// safety-net limit, bail!
double removalPercentage =
(1.0 * slaves.recovered.size()) /
(1.0 * registry.slaves().slaves().size());
if (removalPercentage > limit) {
EXIT(EXIT_FAILURE)
<< "Post-recovery agent removal limit exceeded! After "
<< flags.agent_reregister_timeout
<< " there were " << slaves.recovered.size()
<< " (" << removalPercentage * 100 << "%) agents recovered from the"
<< " registry that did not reregister: \n"
<< stringify(slaves.recovered.keys()) << "\n "
<< " The configured removal limit is " << limit * 100 << "%. Please"
<< " investigate or increase this limit to proceed further";
}
// Remove the slaves in a rate limited manner, similar to how the
// SlaveObserver removes slaves.
foreach (const Registry::Slave& slave, registry.slaves().slaves()) {
// The slave is removed from `recovered` when it completes the
// re-registration process. If the slave is in `reregistering`, it
// has started but not yet finished reregistering. In either
// case, we don't want to try to remove it.
if (!slaves.recovered.contains(slave.info().id()) ||
slaves.reregistering.contains(slave.info().id())) {
continue;
}
Future<Nothing> acquire = Nothing();
if (slaves.limiter.isSome()) {
LOG(INFO) << "Scheduling removal of agent "
<< slave.info().id() << " (" << slave.info().hostname() << ")"
<< "; did not reregister within "
<< flags.agent_reregister_timeout << " after master failover";
acquire = slaves.limiter.get()->acquire();
}
const string failure = "Agent removal rate limit acquisition failed";
// TODO(bmahler): Cancelation currently occurs within by returning
// early from `markUnreachable` *without* the "discarder" having
// discarded the rate limit token. This approach means that if
// agents reregister while many of the marking unreachable
// operations are in progress, the rate that we mark unreachable
// will "slow down" rather than stay constant. We should instead
// discard the rate limit token when the agent reregisters and
// handle the discard here. See MESOS-8386.
acquire
.onFailed(lambda::bind(fail, failure, lambda::_1))
.onDiscarded(lambda::bind(fail, failure, "discarded"))
.then(defer(self(),
&Self::markUnreachable,
slave.info(),
true,
"did not reregister within"
" " + stringify(flags.agent_reregister_timeout) +
" after master failover"))
.then(defer(self(), [=](bool marked) {
if (marked) {
++metrics->slave_unreachable_completed;
} else {
++metrics->slave_unreachable_canceled;
}
return Nothing();
}));
++metrics->slave_unreachable_scheduled;
}
}
void Master::sendSlaveLost(const SlaveInfo& slaveInfo)
{
foreachvalue (Framework* framework, frameworks.registered) {
if (!framework->connected()) {
continue;
}
LOG(INFO) << "Notifying framework " << *framework << " of lost agent "
<< slaveInfo.id() << " (" << slaveInfo.hostname() << ")";
LostSlaveMessage message;
message.mutable_slave_id()->MergeFrom(slaveInfo.id());
framework->send(message);
}
if (HookManager::hooksAvailable()) {
HookManager::masterSlaveLostHook(slaveInfo);
}
}
void Master::fileAttached(const Future<Nothing>& result, const string& path)
{
if (result.isReady()) {
LOG(INFO) << "Successfully attached file '" << path << "'";
} else {
LOG(ERROR) << "Failed to attach file '" << path << "': "
<< (result.isFailed() ? result.failure() : "discarded");
}
}
void Master::submitScheduler(const string& name)
{
LOG(INFO) << "Scheduler submit request for " << name;
SubmitSchedulerResponse response;
response.set_okay(false);
reply(response);
}
void Master::contended(const Future<Future<Nothing>>& candidacy)
{
CHECK(!candidacy.isDiscarded());
if (candidacy.isFailed()) {
EXIT(EXIT_FAILURE) << "Failed to contend: " << candidacy.failure();
}
// Watch for candidacy change.
candidacy
->onAny(defer(self(), &Master::lostCandidacy, lambda::_1));
}
void Master::lostCandidacy(const Future<Nothing>& lost)
{
CHECK(!lost.isDiscarded());
if (lost.isFailed()) {
EXIT(EXIT_FAILURE) << "Failed to watch for candidacy: " << lost.failure();
}
if (elected()) {
EXIT(EXIT_FAILURE) << "Lost candidacy as a leader... committing suicide!";
}
LOG(INFO) << "Lost candidacy as a follower... Contend again";
contender->contend()
.onAny(defer(self(), &Master::contended, lambda::_1));
}
void Master::detected(const Future<Option<MasterInfo>>& _leader)
{
CHECK(!_leader.isDiscarded());
if (_leader.isFailed()) {
EXIT(EXIT_FAILURE)
<< "Failed to detect the leading master: " << _leader.failure()
<< "; committing suicide!";
}
bool wasElected = elected();
leader = _leader.get();
if (elected()) {
electedTime = Clock::now();
if (!wasElected) {
LOG(INFO) << "Elected as the leading master!";
// Begin the recovery process, bail if it fails or is discarded.
recover()
.onFailed(lambda::bind(fail, "Recovery failed", lambda::_1))
.onDiscarded(lambda::bind(fail, "Recovery failed", "discarded"));
} else {
// This happens if there is a ZK blip that causes a re-election
// but the same leading master is elected as leader.
LOG(INFO) << "Re-elected as the leading master";
}
} else if (leader.isSome()) {
// A different node has been elected as the leading master.
LOG(INFO) << "The newly elected leader is " << leader->pid()
<< " with id " << leader->id();
if (wasElected) {
EXIT(EXIT_FAILURE) << "Conceded leadership to another master..."
<< " committing suicide!";
}
// If this master and the current leader both have a configured
// domain and the current leader is located in a different region,
// exit with an error message: this indicates a configuration
// error, since all masters must be in the same region.
if (leader->has_domain() && info_.has_domain()) {
const DomainInfo& leaderDomain = leader->domain();
const DomainInfo& selfDomain = info_.domain();
// We currently reject configured domains without fault domains,
// but that might change in the future. For compatibility with
// future versions of Mesos, we treat a master with a configured
// domain but no fault domain as equivalent to a master with no
// configured domain.
if (leaderDomain.has_fault_domain() && selfDomain.has_fault_domain()) {
const DomainInfo::FaultDomain::RegionInfo& leaderRegion =
leaderDomain.fault_domain().region();
const DomainInfo::FaultDomain::RegionInfo& selfRegion =
selfDomain.fault_domain().region();
if (leaderRegion != selfRegion) {
EXIT(EXIT_FAILURE) << "Leading master uses domain "
<< leaderDomain << "; this master is "
<< "configured to use domain "
<< selfDomain << "; all masters in the "
<< "same cluster must use the same region";
}
}
}
} else {
// If an election occured and no leader was elected, `None` is returned.
LOG(INFO) << "No master was elected.";
if (wasElected) {
EXIT(EXIT_FAILURE) << "Lost leadership after indecisive election..."
<< " committing suicide!";
}
}
// Keep detecting.
detector->detect(leader)
.onAny(defer(self(), &Master::detected, lambda::_1));
}
Future<bool> Master::authorizeFramework(
const FrameworkInfo& frameworkInfo)
{
if (authorizer.isNone()) {
return true; // Authorization is disabled.
}
LOG(INFO) << "Authorizing framework principal '" << frameworkInfo.principal()
<< "' to receive offers for roles '"
<< stringify(protobuf::framework::getRoles(frameworkInfo)) << "'";
authorization::Request request;
request.set_action(authorization::REGISTER_FRAMEWORK);
if (frameworkInfo.has_principal()) {
request.mutable_subject()->set_value(frameworkInfo.principal());
}
request.mutable_object()->mutable_framework_info()->CopyFrom(frameworkInfo);
// For non-`MULTI_ROLE` frameworks, also propagate its single role
// via the request's `value` field. This is purely for backwards
// compatibility as the `value` field is deprecated. Note that this
// means that authorizers relying on the deprecated field will see
// an empty string in `value` for `MULTI_ROLE` frameworks.
//
// TODO(bbannier): Remove this at the end of `value`'s deprecation
// cycle, see MESOS-7073.
if (!protobuf::frameworkHasCapability(
frameworkInfo, FrameworkInfo::Capability::MULTI_ROLE)) {
request.mutable_object()->set_value(frameworkInfo.role());
}
return authorizer.get()->authorized(request);
}
Option<Error> Master::validateFrameworkAuthentication(
const FrameworkInfo& frameworkInfo,
const UPID& from)
{
if (authenticating.contains(from)) {
return Error("Re-authentication in progress");
}
if (flags.authenticate_frameworks && !authenticated.contains(from)) {
// This could happen if another authentication request came
// through before we are here or if a framework tried to
// (re-)register without authentication.
return Error("Framework at " + stringify(from) + " is not authenticated");
}
// TODO(bmahler): Currently the scheduler driver does not
// set 'principal', so we allow frameworks to omit it.
if (frameworkInfo.has_principal() &&
authenticated.contains(from) &&
frameworkInfo.principal() != authenticated[from]) {
return Error("Framework principal '" + frameworkInfo.principal() + "'"
" does not match authenticated principal"
" '" + authenticated[from] + "'");
}
return None();
}
void Master::drop(
const UPID& from,
const scheduler::Call& call,
const string& message)
{
// TODO(bmahler): Increment a metric.
LOG(WARNING) << "Dropping " << call.type() << " call"
<< " from framework " << call.framework_id()
<< " at " << from << ": " << message;
}
void Master::drop(
Framework* framework,
const Offer::Operation& operation,
const string& message)
{
CHECK_NOTNULL(framework);
// TODO(jieyu): Increment a metric.
LOG(WARNING) << "Dropping " << Offer::Operation::Type_Name(operation.type())
<< " operation from framework " << *framework
<< ": " << message;
// NOTE: The operation validation code should be refactored. Due to the order
// of validation, it's possible that this function will be called before the
// master validates that operations from v0 frameworks should not have their
// ID set.
if (operation.has_id() && framework->http.isSome()) {
scheduler::Event update;
update.set_type(scheduler::Event::UPDATE_OPERATION_STATUS);
// NOTE: We do not attempt to set the agent or resource provider IDs for
// dropped operations as we cannot guarantee to always know their values.
//
// TODO(bbannier): Set agent or resource provider ID if we know
// for certain that the operation was valid.
*update.mutable_update_operation_status()->mutable_status() =
protobuf::createOperationStatus(
OperationState::OPERATION_ERROR,
operation.id(),
message);
framework->send(update);
}
}
void Master::drop(
Framework* framework,
const scheduler::Call& call,
const string& message)
{
CHECK_NOTNULL(framework);
// TODO(gyliu513): Increment a metric.
LOG(WARNING) << "Dropping " << call.type() << " call"
<< " from framework " << *framework
<< ": " << message;
}
void Master::drop(
Framework* framework,
const scheduler::Call::Suppress& suppress,
const string& message)
{
scheduler::Call call;
call.set_type(scheduler::Call::SUPPRESS);
call.mutable_suppress()->CopyFrom(suppress);
drop(framework, call, message);
}
void Master::drop(
Framework* framework,
const scheduler::Call::Revive& revive,
const string& message)
{
scheduler::Call call;
call.set_type(scheduler::Call::REVIVE);
call.mutable_revive()->CopyFrom(revive);
drop(framework, call, message);
}
void Master::receive(
const UPID& from,
scheduler::Call&& call)
{
// TODO(vinod): Add metrics for calls.
Option<Error> error = validation::scheduler::call::validate(call);
if (error.isSome()) {
metrics->incrementInvalidSchedulerCalls(call);
drop(from, call, error->message);
return;
}
if (call.type() == scheduler::Call::SUBSCRIBE) {
subscribe(from, call.subscribe());
return;
}
// We consolidate the framework lookup and pid validation logic here
// because they are common for all the call handlers.
Framework* framework = getFramework(call.framework_id());
if (framework == nullptr) {
drop(from, call, "Framework cannot be found");
return;
}
if (framework->pid != from) {
drop(from, call, "Call is not from registered framework");
return;
}
framework->metrics.incrementCall(call.type());
// This is possible when master --> framework link is broken (i.e., one
// way network partition) and the framework is not aware of it. There
// is no way for driver based frameworks to detect this in the absence
// of periodic heartbeat events. We send an error message to the framework
// causing the scheduler driver to abort when this happens.
if (!framework->connected()) {
const string error = "Framework disconnected";
LOG(INFO) << "Refusing " << call.type() << " call from framework "
<< *framework << ": " << error;
FrameworkErrorMessage message;
message.set_message(error);
send(from, message);
return;
}
switch (call.type()) {
case scheduler::Call::SUBSCRIBE:
// SUBSCRIBE call should have been handled above.
LOG(FATAL) << "Unexpected 'SUBSCRIBE' call";
case scheduler::Call::TEARDOWN:
teardown(framework);
break;
case scheduler::Call::ACCEPT:
accept(framework, std::move(*call.mutable_accept()));
break;
case scheduler::Call::DECLINE:
decline(framework, std::move(*call.mutable_decline()));
break;
case scheduler::Call::ACCEPT_INVERSE_OFFERS:
acceptInverseOffers(framework, call.accept_inverse_offers());
break;
case scheduler::Call::DECLINE_INVERSE_OFFERS:
declineInverseOffers(framework, call.decline_inverse_offers());
break;
case scheduler::Call::REVIVE:
revive(framework, call.revive());
break;
case scheduler::Call::KILL:
kill(framework, call.kill());
break;
case scheduler::Call::SHUTDOWN:
shutdown(framework, call.shutdown());
break;
case scheduler::Call::ACKNOWLEDGE: {
acknowledge(framework, std::move(*call.mutable_acknowledge()));
break;
}
case scheduler::Call::ACKNOWLEDGE_OPERATION_STATUS: {
drop(
from,
call,
"'ACKNOWLEDGE_OPERATION_STATUS' is not supported by the v0 API");
break;
}
case scheduler::Call::RECONCILE:
reconcile(framework, std::move(*call.mutable_reconcile()));
break;
case scheduler::Call::RECONCILE_OPERATIONS:
drop(
from,
call,
"'RECONCILE_OPERATIONS' is not supported by the v0 API");
break;
case scheduler::Call::MESSAGE:
message(framework, std::move(*call.mutable_message()));
break;
case scheduler::Call::REQUEST:
request(framework, call.request());
break;
case scheduler::Call::SUPPRESS:
suppress(framework, call.suppress());
break;
case scheduler::Call::UNKNOWN:
LOG(WARNING) << "'UNKNOWN' call";
break;
}
}
void Master::registerFramework(
const UPID& from,
RegisterFrameworkMessage&& registerFrameworkMessage)
{
FrameworkInfo frameworkInfo =
std::move(*registerFrameworkMessage.mutable_framework());
if (frameworkInfo.has_id() && !frameworkInfo.id().value().empty()) {
const string error = "Registering with 'id' already set";
LOG(INFO) << "Refusing registration request of framework"
<< " '" << frameworkInfo.name() << "' at " << from
<< ": " << error;
FrameworkErrorMessage message;
message.set_message(error);
send(from, message);
return;
}
scheduler::Call::Subscribe call;
*call.mutable_framework_info() = std::move(frameworkInfo);
subscribe(from, call);
}
void Master::reregisterFramework(
const UPID& from,
ReregisterFrameworkMessage&& reregisterFrameworkMessage)
{
FrameworkInfo frameworkInfo =
std::move(*reregisterFrameworkMessage.mutable_framework());
if (!frameworkInfo.has_id() || frameworkInfo.id().value().empty()) {
const string error = "Re-registering without an 'id'";
LOG(INFO) << "Refusing re-registration request of framework"
<< " '" << frameworkInfo.name() << "' at " << from
<< ": " << error;
FrameworkErrorMessage message;
message.set_message(error);
send(from, message);
return;
}
scheduler::Call::Subscribe call;
*call.mutable_framework_info() = std::move(frameworkInfo);
call.set_force(reregisterFrameworkMessage.failover());
subscribe(from, call);
}
void Master::subscribe(
HttpConnection http,
const scheduler::Call::Subscribe& subscribe)
{
// TODO(anand): Authenticate the framework.
const FrameworkInfo& frameworkInfo = subscribe.framework_info();
// Update messages_{re}register_framework accordingly.
if (!frameworkInfo.has_id() || frameworkInfo.id() == "") {
++metrics->messages_register_framework;
} else {
++metrics->messages_reregister_framework;
}
LOG(INFO) << "Received subscription request for"
<< " HTTP framework '" << frameworkInfo.name() << "'";
Option<Error> validationError =
validation::framework::validate(frameworkInfo);
if (validationError.isNone()) {
// Check the framework's role(s) against the whitelist.
set<string> invalidRoles;
if (protobuf::frameworkHasCapability(
frameworkInfo,
FrameworkInfo::Capability::MULTI_ROLE)) {
foreach (const string& role, frameworkInfo.roles()) {
if (!isWhitelistedRole(role)) {
invalidRoles.insert(role);
}
}
} else {
if (!isWhitelistedRole(frameworkInfo.role())) {
invalidRoles.insert(frameworkInfo.role());
}
}
if (!invalidRoles.empty()) {
validationError = Error("Roles " + stringify(invalidRoles) +
" are not present in master's --roles");
}
}
// Ensure each of the suppressed role is contained in the list of roles.
set<string> frameworkRoles = protobuf::framework::getRoles(frameworkInfo);
set<string> suppressedRoles = set<string>(
subscribe.suppressed_roles().begin(), subscribe.suppressed_roles().end());
if (validationError.isNone()) {
// The suppressed roles must be contained within the list of all
// roles for the framwork.
foreach (const string& role, suppressedRoles) {
if (!frameworkRoles.count(role)) {
validationError = Error("Suppressed role '" + role +
"' is not contained in the list of roles");
break;
}
}
}
// TODO(vinod): Deprecate this in favor of authorization.
if (validationError.isNone() &&
frameworkInfo.user() == "root" && !flags.root_submissions) {
validationError = Error("User 'root' is not allowed to run frameworks"
" without --root_submissions set");
}
if (validationError.isNone() && frameworkInfo.has_id() &&
isCompletedFramework(frameworkInfo.id())) {
// This could happen if a framework tries to subscribe after its failover
// timeout has elapsed, or it has been torn down via the operator API.
//
// TODO(vinod): Master should persist admitted frameworks to the
// registry and remove them from it after failover timeout.
validationError = Error("Framework has been removed");
}
if (validationError.isNone() && !isValidFailoverTimeout(frameworkInfo)) {
validationError = Error("The framework failover_timeout (" +
stringify(frameworkInfo.failover_timeout()) +
") is invalid");
}
if (validationError.isSome()) {
LOG(INFO) << "Refusing subscription of framework"
<< " '" << frameworkInfo.name() << "': "
<< validationError->message;
FrameworkErrorMessage message;
message.set_message(validationError->message);
http.send(message);
http.close();
return;
}
// Need to disambiguate for the compiler.
void (Master::*_subscribe)(
HttpConnection,
const FrameworkInfo&,
bool,
const set<string>&,
const Future<bool>&) = &Self::_subscribe;
authorizeFramework(frameworkInfo)
.onAny(defer(self(),
_subscribe,
http,
frameworkInfo,
subscribe.force(),
suppressedRoles,
lambda::_1));
}
void Master::_subscribe(
HttpConnection http,
const FrameworkInfo& frameworkInfo,
bool force,
const set<string>& suppressedRoles,
const Future<bool>& authorized)
{
CHECK(!authorized.isDiscarded());
Option<Error> authorizationError = None();
if (authorized.isFailed()) {
authorizationError =
Error("Authorization failure: " + authorized.failure());
} else if (!authorized.get()) {
authorizationError = Error(
"Not authorized to use roles '" +
stringify(protobuf::framework::getRoles(frameworkInfo)) + "'");
}
if (authorizationError.isSome()) {
LOG(INFO) << "Refusing subscription of framework"
<< " '" << frameworkInfo.name