blob: c9f989aa209c1067b60d02cfb4f032afe0f9de4c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __MASTER_HPP__
#define __MASTER_HPP__
#include <stdint.h>
#include <list>
#include <string>
#include <vector>
#include <boost/circular_buffer.hpp>
#include <mesos/resources.hpp>
#include <process/http.hpp>
#include <process/owned.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
#include <process/timer.hpp>
#include <process/metrics/counter.hpp>
#include <process/metrics/gauge.hpp>
#include <process/metrics/metrics.hpp>
#include <stout/cache.hpp>
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/memory.hpp>
#include <stout/multihashmap.hpp>
#include <stout/option.hpp>
#include "common/type_utils.hpp"
#include "files/files.hpp"
#include "master/constants.hpp"
#include "master/contender.hpp"
#include "master/detector.hpp"
#include "master/flags.hpp"
#include "master/registrar.hpp"
#include "messages/messages.hpp"
namespace process {
class RateLimiter; // Forward declaration.
}
namespace mesos {
namespace internal {
// Forward declarations.
namespace registry {
class Slaves;
}
namespace sasl {
class Authenticator;
}
class Authorizer;
namespace master {
// Forward declarations.
namespace allocator {
class Allocator;
}
class Repairer;
class SlaveObserver;
class WhitelistWatcher;
struct Framework;
struct OfferVisitor;
struct Role;
struct Slave;
struct TaskInfoVisitor;
class Master : public ProtobufProcess<Master>
{
public:
Master(allocator::Allocator* allocator,
Registrar* registrar,
Repairer* repairer,
Files* files,
MasterContender* contender,
MasterDetector* detector,
const Option<Authorizer*>& authorizer,
const Flags& flags = Flags());
virtual ~Master();
void submitScheduler(
const std::string& name);
void registerFramework(
const process::UPID& from,
const FrameworkInfo& frameworkInfo);
void reregisterFramework(
const process::UPID& from,
const FrameworkInfo& frameworkInfo,
bool failover);
void unregisterFramework(
const process::UPID& from,
const FrameworkID& frameworkId);
void deactivateFramework(
const process::UPID& from,
const FrameworkID& frameworkId);
void resourceRequest(
const process::UPID& from,
const FrameworkID& frameworkId,
const std::vector<Request>& requests);
void launchTasks(
const process::UPID& from,
const FrameworkID& frameworkId,
const std::vector<TaskInfo>& tasks,
const Filters& filters,
const std::vector<OfferID>& offerIds);
void reviveOffers(
const process::UPID& from,
const FrameworkID& frameworkId);
void killTask(
const process::UPID& from,
const FrameworkID& frameworkId,
const TaskID& taskId);
void statusUpdateAcknowledgement(
const process::UPID& from,
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const TaskID& taskId,
const std::string& uuid);
void schedulerMessage(
const process::UPID& from,
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const std::string& data);
void registerSlave(
const process::UPID& from,
const SlaveInfo& slaveInfo);
void reregisterSlave(
const process::UPID& from,
const SlaveID& slaveId,
const SlaveInfo& slaveInfo,
const std::vector<ExecutorInfo>& executorInfos,
const std::vector<Task>& tasks,
const std::vector<Archive::Framework>& completedFrameworks);
void unregisterSlave(
const process::UPID& from,
const SlaveID& slaveId);
void statusUpdate(
const StatusUpdate& update,
const process::UPID& pid);
void exitedExecutor(
const process::UPID& from,
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
int32_t status);
void shutdownSlave(
const SlaveID& slaveId,
const std::string& message);
// TODO(bmahler): It would be preferred to use a unique libprocess
// Process identifier (PID is not sufficient) for identifying the
// framework instance, rather than relying on re-registration time.
void frameworkFailoverTimeout(
const FrameworkID& frameworkId,
const process::Time& reregisteredTime);
void offer(
const FrameworkID& framework,
const hashmap<SlaveID, Resources>& resources);
void reconcileTasks(
const process::UPID& from,
const FrameworkID& frameworkId,
const std::vector<TaskStatus>& statuses);
void authenticate(
const process::UPID& from,
const process::UPID& pid);
// Invoked when there is a newly elected leading master.
// Made public for testing purposes.
void detected(const process::Future<Option<MasterInfo> >& pid);
// Invoked when the contender has lost the candidacy.
// Made public for testing purposes.
void lostCandidacy(const process::Future<Nothing>& lost);
// Continuation of recover().
// Made public for testing purposes.
process::Future<Nothing> _recover(const Registry& registry);
// Continuation of reregisterSlave().
// Made public for testing purposes.
// TODO(vinod): Instead of doing this create and use a
// MockRegistrar.
// TODO(dhamon): Consider FRIEND_TEST macro from gtest.
void _reregisterSlave(
const SlaveInfo& slaveInfo,
const process::UPID& pid,
const std::vector<ExecutorInfo>& executorInfos,
const std::vector<Task>& tasks,
const std::vector<Archive::Framework>& completedFrameworks,
const process::Future<bool>& readmit);
MasterInfo info() const
{
return info_;
}
protected:
virtual void initialize();
virtual void finalize();
virtual void exited(const process::UPID& pid);
virtual void visit(const process::MessageEvent& event);
virtual void visit(const process::ExitedEvent& event);
// Invoked when the message is ready to be executed after
// being throttled.
// 'principal' being None indicates it is throttled by
// 'defaultLimiter'.
void throttled(
const process::MessageEvent& event,
const Option<std::string>& principal);
// Continuations of visit().
void _visit(const process::MessageEvent& event);
void _visit(const process::ExitedEvent& event);
// Helper method invoked when the capacity for a framework
// principal is exceeded.
void exceededCapacity(
const process::MessageEvent& event,
const Option<std::string>& principal,
uint64_t capacity);
// Recovers state from the registrar.
process::Future<Nothing> recover();
void recoveredSlavesTimeout(const Registry& registry);
void _registerSlave(
const SlaveInfo& slaveInfo,
const process::UPID& pid,
const process::Future<bool>& admit);
void __reregisterSlave(
Slave* slave,
const std::vector<Task>& tasks);
// 'promise' is used to signal finish of authentication.
// 'future' is the future returned by the authenticator.
void _authenticate(
const process::UPID& pid,
const process::Owned<process::Promise<Nothing> >& promise,
const process::Future<Option<std::string> >& future);
void authenticationTimeout(process::Future<Option<std::string> > future);
void fileAttached(const process::Future<Nothing>& result,
const std::string& path);
// Return connected frameworks that are not in the process of being removed
std::vector<Framework*> getActiveFrameworks() const;
// Invoked when the contender has entered the contest.
void contended(const process::Future<process::Future<Nothing> >& candidacy);
// Reconciles a re-registering slave's tasks / executors and sends
// TASK_LOST updates for tasks known to the master but unknown to
// the slave.
void reconcile(
Slave* slave,
const std::vector<ExecutorInfo>& executors,
const std::vector<Task>& tasks);
// 'registerFramework()' continuation.
void _registerFramework(
const process::UPID& from,
const FrameworkInfo& frameworkInfo,
const process::Future<Option<Error> >& validationError);
// 'reregisterFramework()' continuation.
void _reregisterFramework(
const process::UPID& from,
const FrameworkInfo& frameworkInfo,
bool failover,
const process::Future<Option<Error> >& validationError);
// Add a framework.
void addFramework(Framework* framework);
// Replace the scheduler for a framework with a new process ID, in
// the event of a scheduler failover.
void failoverFramework(Framework* framework, const process::UPID& newPid);
// Kill all of a framework's tasks, delete the framework object, and
// reschedule offers that were assigned to this framework.
void removeFramework(Framework* framework);
// Remove a framework from the slave, i.e., remove its tasks and
// executors and recover the resources.
void removeFramework(Slave* slave, Framework* framework);
void deactivate(Framework* framework);
void disconnect(Slave* slave);
// Add a slave.
void addSlave(Slave* slave, bool reregister = false);
void readdSlave(
Slave* slave,
const std::vector<ExecutorInfo>& executorInfos,
const std::vector<Task>& tasks,
const std::vector<Archive::Framework>& completedFrameworks);
// Remove the slave from the registrar and from the master's state.
void removeSlave(Slave* slave);
void _removeSlave(
const SlaveInfo& slaveInfo,
const std::vector<StatusUpdate>& updates,
const process::Future<bool>& removed);
// Validates the task including authorization.
// Returns None if the task is valid.
// Returns Error if the task is invalid.
// Returns Failure if authorization returns 'Failure'.
process::Future<Option<Error> > validateTask(
const TaskInfo& task,
Framework* framework,
Slave* slave,
const Resources& totalResources);
// Launch a task from a task description.
void launchTask(const TaskInfo& task, Framework* framework, Slave* slave);
// 'launchTasks()' continuation.
void _launchTasks(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
const std::vector<TaskInfo>& tasks,
const Resources& totalResources,
const Filters& filters,
const process::Future<std::list<process::Future<Option<Error> > > >& f);
// Remove a task.
void removeTask(Task* task);
// Forwards the update to the framework.
void forward(
const StatusUpdate& update,
const process::UPID& acknowledgee,
Framework* framework);
// Remove an offer and optionally rescind the offer as well.
void removeOffer(Offer* offer, bool rescind = false);
Framework* getFramework(const FrameworkID& frameworkId);
Slave* getSlave(const SlaveID& slaveId);
Offer* getOffer(const OfferID& offerId);
FrameworkID newFrameworkId();
OfferID newOfferId();
SlaveID newSlaveId();
Option<Credentials> credentials;
private:
// Inner class used to namespace HTTP route handlers (see
// master/http.cpp for implementations).
class Http
{
public:
explicit Http(Master* _master) : master(_master) {}
// /master/health
process::Future<process::http::Response> health(
const process::http::Request& request);
// /master/observe
process::Future<process::http::Response> observe(
const process::http::Request& request);
// /master/redirect
process::Future<process::http::Response> redirect(
const process::http::Request& request);
// /master/roles.json
process::Future<process::http::Response> roles(
const process::http::Request& request);
// /master/shutdown
process::Future<process::http::Response> shutdown(
const process::http::Request& request);
// /master/state.json
process::Future<process::http::Response> state(
const process::http::Request& request);
// /master/stats.json
process::Future<process::http::Response> stats(
const process::http::Request& request);
// /master/tasks.json
process::Future<process::http::Response> tasks(
const process::http::Request& request);
const static std::string HEALTH_HELP;
const static std::string OBSERVE_HELP;
const static std::string REDIRECT_HELP;
const static std::string SHUTDOWN_HELP;
const static std::string TASKS_HELP;
private:
// Helper for doing authentication, returns the credential used if
// the authentication was successful (or none if no credentials
// have been given to the master), otherwise an Error.
Result<Credential> authenticate(const process::http::Request& request);
// Continuations.
process::Future<process::http::Response> _shutdown(
const FrameworkID& id,
bool authorized = true);
Master* master;
} http;
Master(const Master&); // No copying.
Master& operator = (const Master&); // No assigning.
friend struct OfferVisitor;
const Flags flags;
Option<MasterInfo> leader; // Current leading master.
// Whether we are the current leading master.
bool elected() const
{
return leader.isSome() && leader.get() == info_;
}
allocator::Allocator* allocator;
WhitelistWatcher* whitelistWatcher;
Registrar* registrar;
Repairer* repairer;
Files* files;
MasterContender* contender;
MasterDetector* detector;
const Option<Authorizer*> authorizer;
MasterInfo info_;
// Indicates when recovery is complete. Recovery begins once the
// master is elected as a leader.
Option<process::Future<Nothing> > recovered;
struct Slaves
{
Slaves() : removed(MAX_REMOVED_SLAVES) {}
// Imposes a time limit for slaves that we recover from the
// registry to re-register with the master.
Option<process::Timer> recoveredTimer;
// Slaves that have been recovered from the registrar but have yet
// to re-register. We keep a "reregistrationTimer" above to ensure
// we remove these slaves if they do not re-register.
hashset<SlaveID> recovered;
// Slaves that are in the process of registering.
hashset<process::UPID> registering;
// Only those slaves that are re-registering for the first time
// with this master. We must not answer questions related to
// these slaves until the registrar determines their fate.
hashset<SlaveID> reregistering;
hashmap<SlaveID, Slave*> registered;
// Slaves that are in the process of being removed from the
// registrar. Think of these as being partially removed: we must
// not answer questions related to these until they are removed
// from the registry.
hashset<SlaveID> removing;
// We track removed slaves to preserve the consistency
// semantics of the pre-registrar code when a non-strict registrar
// is being used. That is, if we remove a slave, we must make
// an effort to prevent it from (re-)registering, sending updates,
// etc. We keep a cache here to prevent this from growing in an
// unbounded manner.
// TODO(bmahler): Ideally we could use a cache with set semantics.
Cache<SlaveID, Nothing> removed;
bool transitioning(const Option<SlaveID>& slaveId)
{
if (slaveId.isSome()) {
return recovered.contains(slaveId.get()) ||
reregistering.contains(slaveId.get()) ||
removing.contains(slaveId.get());
} else {
return !recovered.empty() ||
!reregistering.empty() ||
!removing.empty();
}
}
} slaves;
struct Frameworks
{
Frameworks() : completed(MAX_COMPLETED_FRAMEWORKS) {}
hashmap<FrameworkID, Framework*> registered;
boost::circular_buffer<memory::shared_ptr<Framework> > completed;
// Principals of frameworks keyed by PID.
// NOTE: Multiple PIDs can map to the same principal. The
// principal is None when the framework doesn't specify it.
// The differences between this map and 'authenticated' are:
// 1) This map only includes *registered* frameworks. The mapping
// is added when a framework (re-)registers.
// 2) This map includes unauthenticated frameworks (when Master
// allows them) if they have principals specified in
// FrameworkInfo.
hashmap<process::UPID, Option<std::string> > principals;
} frameworks;
hashmap<OfferID, Offer*> offers;
hashmap<std::string, Role*> roles;
// Frameworks/slaves that are currently in the process of authentication.
// 'authenticating' future for an authenticatee is ready when it is
// authenticated.
hashmap<process::UPID, process::Future<Nothing> > authenticating;
hashmap<process::UPID, process::Owned<sasl::Authenticator> > authenticators;
// Principals of authenticated frameworks/slaves keyed by PID.
hashmap<process::UPID, std::string> authenticated;
int64_t nextFrameworkId; // Used to give each framework a unique ID.
int64_t nextOfferId; // Used to give each slot offer a unique ID.
int64_t nextSlaveId; // Used to give each slave a unique ID.
// TODO(bmahler): These are deprecated! Please use metrics instead.
// Statistics (initialized in Master::initialize).
struct
{
uint64_t tasks[TaskState_ARRAYSIZE];
uint64_t validStatusUpdates;
uint64_t invalidStatusUpdates;
uint64_t validFrameworkMessages;
uint64_t invalidFrameworkMessages;
} stats;
struct Metrics
{
Metrics(const Master& master);
~Metrics();
process::metrics::Gauge uptime_secs;
process::metrics::Gauge elected;
process::metrics::Gauge slaves_active;
process::metrics::Gauge slaves_inactive;
process::metrics::Gauge frameworks_active;
process::metrics::Gauge frameworks_inactive;
process::metrics::Gauge outstanding_offers;
// Task state metrics.
process::metrics::Gauge tasks_staging;
process::metrics::Gauge tasks_starting;
process::metrics::Gauge tasks_running;
process::metrics::Counter tasks_finished;
process::metrics::Counter tasks_failed;
process::metrics::Counter tasks_killed;
process::metrics::Counter tasks_lost;
// Message counters.
process::metrics::Counter dropped_messages;
// Metrics specific to frameworks of a common principal.
// These metrics have names prefixed by "frameworks/<principal>/".
struct Frameworks
{
// Counters for messages from all frameworks of this principal.
// Note: We only count messages from active scheduler
// *instances* while they are *registered*. i.e., messages
// prior to the completion of (re)registration
// (AuthenticateMessage and (Re)RegisterFrameworkMessage) and
// messages from an inactive scheduler instance (after the
// framework has failed over) are not counted.
// Framework messages received (before processing).
process::metrics::Counter messages_received;
// Framework messages processed.
// NOTE: This doesn't include dropped messages. Processing of
// a message may be throttled by a RateLimiter if one is
// configured for this principal. Also due to Master's
// asynchronous nature, this doesn't necessarily mean the work
// requested by this message has finished.
process::metrics::Counter messages_processed;
explicit Frameworks(const std::string& principal)
: messages_received("frameworks/" + principal + "/messages_received"),
messages_processed("frameworks/" + principal + "/messages_processed")
{
process::metrics::add(messages_received);
process::metrics::add(messages_processed);
}
~Frameworks()
{
process::metrics::remove(messages_received);
process::metrics::remove(messages_processed);
}
};
// Per-framework-principal metrics keyed by the framework
// principal.
hashmap<std::string, process::Owned<Frameworks> > frameworks;
// Messages from schedulers.
process::metrics::Counter messages_register_framework;
process::metrics::Counter messages_reregister_framework;
process::metrics::Counter messages_unregister_framework;
process::metrics::Counter messages_deactivate_framework;
process::metrics::Counter messages_kill_task;
process::metrics::Counter messages_status_update_acknowledgement;
process::metrics::Counter messages_resource_request;
process::metrics::Counter messages_launch_tasks;
process::metrics::Counter messages_revive_offers;
process::metrics::Counter messages_reconcile_tasks;
process::metrics::Counter messages_framework_to_executor;
// Messages from slaves.
process::metrics::Counter messages_register_slave;
process::metrics::Counter messages_reregister_slave;
process::metrics::Counter messages_unregister_slave;
process::metrics::Counter messages_status_update;
process::metrics::Counter messages_exited_executor;
// Messages from both schedulers and slaves.
process::metrics::Counter messages_authenticate;
process::metrics::Counter valid_framework_to_executor_messages;
process::metrics::Counter invalid_framework_to_executor_messages;
process::metrics::Counter valid_status_updates;
process::metrics::Counter invalid_status_updates;
process::metrics::Counter valid_status_update_acknowledgements;
process::metrics::Counter invalid_status_update_acknowledgements;
// Recovery counters.
process::metrics::Counter recovery_slave_removals;
// Process metrics.
process::metrics::Gauge event_queue_messages;
process::metrics::Gauge event_queue_dispatches;
process::metrics::Gauge event_queue_http_requests;
// Successful registry operations.
process::metrics::Counter slave_registrations;
process::metrics::Counter slave_reregistrations;
process::metrics::Counter slave_removals;
// Resource metrics.
std::vector<process::metrics::Gauge> resources_total;
std::vector<process::metrics::Gauge> resources_used;
std::vector<process::metrics::Gauge> resources_percent;
} metrics;
// Gauge handlers.
double _uptime_secs()
{
return (process::Clock::now() - startTime).secs();
}
double _elected()
{
return elected() ? 1 : 0;
}
double _slaves_active();
double _slaves_inactive();
double _frameworks_active()
{
return getActiveFrameworks().size();
}
double _frameworks_inactive()
{
return frameworks.registered.size() - _frameworks_active();
}
double _outstanding_offers()
{
return offers.size();
}
double _event_queue_messages()
{
return static_cast<double>(eventCount<process::MessageEvent>());
}
double _event_queue_dispatches()
{
return static_cast<double>(eventCount<process::DispatchEvent>());
}
double _event_queue_http_requests()
{
return static_cast<double>(eventCount<process::HttpEvent>());
}
double _tasks_staging();
double _tasks_starting();
double _tasks_running();
double _resources_total(const std::string& name);
double _resources_used(const std::string& name);
double _resources_percent(const std::string& name);
process::Time startTime; // Start time used to calculate uptime.
Option<process::Time> electedTime; // Time when this master is elected.
// Validates the framework including authorization.
// Returns None if the framework is valid.
// Returns Error if the framework is invalid.
// Returns Failure if authorization returns 'Failure'.
process::Future<Option<Error> > validate(
const FrameworkInfo& frameworkInfo,
const process::UPID& from);
struct BoundedRateLimiter
{
BoundedRateLimiter(double qps, Option<uint64_t> _capacity)
: limiter(new process::RateLimiter(qps)),
capacity(_capacity),
messages(0) {}
process::Owned<process::RateLimiter> limiter;
const Option<uint64_t> capacity;
// Number of outstanding messages for this RateLimiter.
// NOTE: ExitedEvents are throttled but not counted towards
// the capacity here.
uint64_t messages;
};
// BoundedRateLimiters keyed by the framework principal.
// Like Metrics::Frameworks, all frameworks of the same principal
// are throttled together at a common rate limit.
hashmap<std::string, Option<process::Owned<BoundedRateLimiter> > > limiters;
// The default limiter is for frameworks not specified in
// 'flags.rate_limits'.
Option<process::Owned<BoundedRateLimiter> > defaultLimiter;
};
struct Slave
{
Slave(const SlaveInfo& _info,
const SlaveID& _id,
const process::UPID& _pid,
const process::Time& time)
: id(_id),
info(_info),
pid(_pid),
registeredTime(time),
disconnected(false),
observer(NULL) {}
~Slave() {}
Task* getTask(const FrameworkID& frameworkId, const TaskID& taskId)
{
if (tasks.contains(frameworkId) && tasks[frameworkId].contains(taskId)) {
return tasks[frameworkId][taskId];
}
return NULL;
}
void addTask(Task* task)
{
CHECK(!tasks[task->framework_id()].contains(task->task_id()))
<< "Duplicate task " << task->task_id()
<< " of framework " << task->framework_id();
tasks[task->framework_id()][task->task_id()] = task;
LOG(INFO) << "Adding task " << task->task_id()
<< " with resources " << task->resources()
<< " on slave " << id << " (" << info.hostname() << ")";
resourcesInUse += task->resources();
}
void removeTask(Task* task)
{
CHECK(tasks[task->framework_id()].contains(task->task_id()))
<< "Unknown task " << task->task_id()
<< " of framework " << task->framework_id();
tasks[task->framework_id()].erase(task->task_id());
if (tasks[task->framework_id()].empty()) {
tasks.erase(task->framework_id());
}
killedTasks.remove(task->framework_id(), task->task_id());
LOG(INFO) << "Removing task " << task->task_id()
<< " with resources " << task->resources()
<< " on slave " << id << " (" << info.hostname() << ")";
resourcesInUse -= task->resources();
}
void addOffer(Offer* offer)
{
CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
offers.insert(offer);
VLOG(1) << "Adding offer " << offer->id()
<< " with resources " << offer->resources()
<< " on slave " << id << " (" << info.hostname() << ")";
resourcesOffered += offer->resources();
}
void removeOffer(Offer* offer)
{
CHECK(offers.contains(offer)) << "Unknown offer " << offer->id();
offers.erase(offer);
VLOG(1) << "Removing offer " << offer->id()
<< " with resources " << offer->resources()
<< " on slave " << id << " (" << info.hostname() << ")";
resourcesOffered -= offer->resources();
}
bool hasExecutor(const FrameworkID& frameworkId,
const ExecutorID& executorId) const
{
return executors.contains(frameworkId) &&
executors.get(frameworkId).get().contains(executorId);
}
void addExecutor(const FrameworkID& frameworkId,
const ExecutorInfo& executorInfo)
{
CHECK(!hasExecutor(frameworkId, executorInfo.executor_id()))
<< "Duplicate executor " << executorInfo.executor_id()
<< " of framework " << frameworkId;
executors[frameworkId][executorInfo.executor_id()] = executorInfo;
// Update the resources in use to reflect running this executor.
resourcesInUse += executorInfo.resources();
}
void removeExecutor(const FrameworkID& frameworkId,
const ExecutorID& executorId)
{
if (hasExecutor(frameworkId, executorId)) {
// Update the resources in use to reflect removing this executor.
resourcesInUse -= executors[frameworkId][executorId].resources();
executors[frameworkId].erase(executorId);
if (executors[frameworkId].size() == 0) {
executors.erase(frameworkId);
}
}
}
const SlaveID id;
const SlaveInfo info;
process::UPID pid;
process::Time registeredTime;
Option<process::Time> reregisteredTime;
// We mark a slave 'disconnected' when it has checkpointing
// enabled because we expect it reregister after recovery.
bool disconnected;
Resources resourcesOffered; // Resources offered.
Resources resourcesInUse; // Resources used by tasks and executors.
// Executors running on this slave.
hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo> > executors;
// Tasks present on this slave.
// TODO(bmahler): The task pointer ownership complexity arises from the fact
// that we own the pointer here, but it's shared with the Framework struct.
// We should find a way to eliminate this.
hashmap<FrameworkID, hashmap<TaskID, Task*> > tasks;
// Tasks that were asked to kill by frameworks.
// This is used for reconciliation when the slave re-registers.
multihashmap<FrameworkID, TaskID> killedTasks;
// Active offers on this slave.
hashset<Offer*> offers;
SlaveObserver* observer;
private:
Slave(const Slave&); // No copying.
Slave& operator = (const Slave&); // No assigning.
};
inline std::ostream& operator << (std::ostream& stream, const Slave& slave)
{
return stream << slave.id << " at " << slave.pid
<< " (" << slave.info.hostname() << ")";
}
// Information about a connected or completed framework.
struct Framework
{
Framework(const FrameworkInfo& _info,
const FrameworkID& _id,
const process::UPID& _pid,
const process::Time& time = process::Clock::now())
: id(_id),
info(_info),
pid(_pid),
active(true),
registeredTime(time),
reregisteredTime(time),
completedTasks(MAX_COMPLETED_TASKS_PER_FRAMEWORK) {}
~Framework() {}
Task* getTask(const TaskID& taskId)
{
if (tasks.count(taskId) > 0) {
return tasks[taskId];
} else {
return NULL;
}
}
void addTask(Task* task)
{
CHECK(!tasks.contains(task->task_id()))
<< "Duplicate task " << task->task_id()
<< " of framework " << task->framework_id();
tasks[task->task_id()] = task;
resources += task->resources();
}
void addCompletedTask(const Task& task)
{
// TODO(adam-mesos): Check if completed task already exists.
completedTasks.push_back(memory::shared_ptr<Task>(new Task(task)));
}
void removeTask(Task* task)
{
CHECK(tasks.contains(task->task_id()))
<< "Unknown task " << task->task_id()
<< " of framework " << task->framework_id();
addCompletedTask(*task);
tasks.erase(task->task_id());
resources -= task->resources();
}
void addOffer(Offer* offer)
{
CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
offers.insert(offer);
resources += offer->resources();
}
void removeOffer(Offer* offer)
{
CHECK(offers.find(offer) != offers.end())
<< "Unknown offer " << offer->id();
offers.erase(offer);
resources -= offer->resources();
}
bool hasExecutor(const SlaveID& slaveId,
const ExecutorID& executorId)
{
return executors.contains(slaveId) &&
executors[slaveId].contains(executorId);
}
void addExecutor(const SlaveID& slaveId,
const ExecutorInfo& executorInfo)
{
CHECK(!hasExecutor(slaveId, executorInfo.executor_id()))
<< "Duplicate executor " << executorInfo.executor_id()
<< " on slave " << slaveId;
executors[slaveId][executorInfo.executor_id()] = executorInfo;
// Update our resources to reflect running this executor.
resources += executorInfo.resources();
}
void removeExecutor(const SlaveID& slaveId,
const ExecutorID& executorId)
{
if (hasExecutor(slaveId, executorId)) {
// Update our resources to reflect removing this executor.
resources -= executors[slaveId][executorId].resources();
executors[slaveId].erase(executorId);
if (executors[slaveId].size() == 0) {
executors.erase(slaveId);
}
}
}
const FrameworkID id; // TODO(benh): Store this in 'info'.
const FrameworkInfo info;
process::UPID pid;
bool active; // Turns false when framework is being removed.
process::Time registeredTime;
process::Time reregisteredTime;
process::Time unregisteredTime;
// Tasks that have not yet been launched because they are being
// validated (e.g., authorized).
hashmap<TaskID, TaskInfo> pendingTasks;
hashmap<TaskID, Task*> tasks;
// NOTE: We use a shared pointer for Task because clang doesn't like
// Boost's implementation of circular_buffer with Task (Boost
// attempts to do some memset's which are unsafe).
boost::circular_buffer<memory::shared_ptr<Task> > completedTasks;
hashset<Offer*> offers; // Active offers for framework.
Resources resources; // Total resources (tasks + offers + executors).
hashmap<SlaveID, hashmap<ExecutorID, ExecutorInfo> > executors;
private:
Framework(const Framework&); // No copying.
Framework& operator = (const Framework&); // No assigning.
};
// Information about an active role.
struct Role
{
explicit Role(const RoleInfo& _info)
: info(_info) {}
void addFramework(Framework* framework)
{
frameworks[framework->id] = framework;
}
void removeFramework(Framework* framework)
{
frameworks.erase(framework->id);
}
Resources resources() const
{
Resources resources;
foreachvalue (Framework* framework, frameworks) {
resources += framework->resources;
}
return resources;
}
RoleInfo info;
hashmap<FrameworkID, Framework*> frameworks;
};
// Implementation of slave admission Registrar operation.
class AdmitSlave : public Operation
{
public:
explicit AdmitSlave(const SlaveInfo& _info) : info(_info)
{
CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field";
}
protected:
virtual Try<bool> perform(
Registry* registry,
hashset<SlaveID>* slaveIDs,
bool strict)
{
// Check and see if this slave already exists.
if (slaveIDs->contains(info.id())) {
if (strict) {
return Error("Slave already admitted");
} else {
return false; // No mutation.
}
}
Registry::Slave* slave = registry->mutable_slaves()->add_slaves();
slave->mutable_info()->CopyFrom(info);
slaveIDs->insert(info.id());
return true; // Mutation.
}
private:
const SlaveInfo info;
};
// Implementation of slave readmission Registrar operation.
class ReadmitSlave : public Operation
{
public:
explicit ReadmitSlave(const SlaveInfo& _info) : info(_info)
{
CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field";
}
protected:
virtual Try<bool> perform(
Registry* registry,
hashset<SlaveID>* slaveIDs,
bool strict)
{
if (slaveIDs->contains(info.id())) {
return false; // No mutation.
}
if (strict) {
return Error("Slave not yet admitted");
} else {
Registry::Slave* slave = registry->mutable_slaves()->add_slaves();
slave->mutable_info()->CopyFrom(info);
slaveIDs->insert(info.id());
return true; // Mutation.
}
}
private:
const SlaveInfo info;
};
// Implementation of slave removal Registrar operation.
class RemoveSlave : public Operation
{
public:
explicit RemoveSlave(const SlaveInfo& _info) : info(_info)
{
CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field";
}
protected:
virtual Try<bool> perform(
Registry* registry,
hashset<SlaveID>* slaveIDs,
bool strict)
{
for (int i = 0; i < registry->slaves().slaves().size(); i++) {
const Registry::Slave& slave = registry->slaves().slaves(i);
if (slave.info().id() == info.id()) {
registry->mutable_slaves()->mutable_slaves()->DeleteSubrange(i, 1);
slaveIDs->erase(info.id());
return true; // Mutation.
}
}
if (strict) {
return Error("Slave not yet admitted");
} else {
return false; // No mutation.
}
}
private:
const SlaveInfo info;
};
} // namespace master {
} // namespace internal {
} // namespace mesos {
#endif // __MASTER_HPP__