blob: abc8728d54c39a3cee8ffb3652cc7d99ff3d915f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__
#define __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__
#include <set>
#include <string>
#include <mesos/mesos.hpp>
#include <process/future.hpp>
#include <process/id.hpp>
#include <process/owned.hpp>
#include <stout/duration.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/lambda.hpp>
#include <stout/option.hpp>
#include "common/protobuf_utils.hpp"
#include "master/allocator/mesos/allocator.hpp"
#include "master/allocator/mesos/metrics.hpp"
#include "master/allocator/sorter/drf/sorter.hpp"
#include "master/allocator/sorter/random/sorter.hpp"
#include "master/constants.hpp"
namespace mesos {
namespace internal {
namespace master {
namespace allocator {
// We forward declare the hierarchical allocator process so that we
// can typedef an instantiation of it with DRF sorters.
template <
typename RoleSorter,
typename FrameworkSorter,
typename QuotaRoleSorter>
class HierarchicalAllocatorProcess;
typedef HierarchicalAllocatorProcess<DRFSorter, DRFSorter, DRFSorter>
HierarchicalDRFAllocatorProcess;
typedef MesosAllocator<HierarchicalDRFAllocatorProcess>
HierarchicalDRFAllocator;
typedef HierarchicalAllocatorProcess<RandomSorter, RandomSorter, RandomSorter>
HierarchicalRandomAllocatorProcess;
typedef MesosAllocator<HierarchicalRandomAllocatorProcess>
HierarchicalRandomAllocator;
namespace internal {
// Forward declarations.
class OfferFilter;
class InverseOfferFilter;
// Implements the basic allocator algorithm - first pick a role by
// some criteria, then pick one of their frameworks to allocate to.
class HierarchicalAllocatorProcess : public MesosAllocatorProcess
{
public:
HierarchicalAllocatorProcess(
const std::function<Sorter*()>& roleSorterFactory,
const std::function<Sorter*()>& _frameworkSorterFactory,
const std::function<Sorter*()>& quotaRoleSorterFactory)
: initialized(false),
paused(true),
metrics(*this),
roleSorter(roleSorterFactory()),
quotaRoleSorter(quotaRoleSorterFactory()),
frameworkSorterFactory(_frameworkSorterFactory) {}
virtual ~HierarchicalAllocatorProcess() {}
process::PID<HierarchicalAllocatorProcess> self() const
{
return process::PID<Self>(this);
}
void initialize(
const Duration& allocationInterval,
const lambda::function<
void(const FrameworkID&,
const hashmap<std::string, hashmap<SlaveID, Resources>>&)>&
offerCallback,
const lambda::function<
void(const FrameworkID&,
const hashmap<SlaveID, UnavailableResources>&)>&
inverseOfferCallback,
const Option<std::set<std::string>>&
fairnessExcludeResourceNames = None(),
bool filterGpuResources = true,
const Option<DomainInfo>& domain = None(),
const Option<std::vector<mesos::internal::ResourceQuantities>>&
minAllocatableResources = None());
void recover(
const int _expectedAgentCount,
const hashmap<std::string, Quota>& quotas);
void addFramework(
const FrameworkID& frameworkId,
const FrameworkInfo& frameworkInfo,
const hashmap<SlaveID, Resources>& used,
bool active,
const std::set<std::string>& suppressedRoles);
void removeFramework(
const FrameworkID& frameworkId);
void activateFramework(
const FrameworkID& frameworkId);
void deactivateFramework(
const FrameworkID& frameworkId);
void updateFramework(
const FrameworkID& frameworkId,
const FrameworkInfo& frameworkInfo,
const std::set<std::string>& suppressedRoles);
void addSlave(
const SlaveID& slaveId,
const SlaveInfo& slaveInfo,
const std::vector<SlaveInfo::Capability>& capabilities,
const Option<Unavailability>& unavailability,
const Resources& total,
const hashmap<FrameworkID, Resources>& used);
void removeSlave(
const SlaveID& slaveId);
void updateSlave(
const SlaveID& slave,
const Option<Resources>& total = None(),
const Option<std::vector<SlaveInfo::Capability>>& capabilities = None());
void deactivateSlave(
const SlaveID& slaveId);
void activateSlave(
const SlaveID& slaveId);
void updateWhitelist(
const Option<hashset<std::string>>& whitelist);
void requestResources(
const FrameworkID& frameworkId,
const std::vector<Request>& requests);
void updateAllocation(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
const Resources& offeredResources,
const std::vector<Offer::Operation>& operations);
process::Future<Nothing> updateAvailable(
const SlaveID& slaveId,
const std::vector<Offer::Operation>& operations);
void updateUnavailability(
const SlaveID& slaveId,
const Option<Unavailability>& unavailability);
void updateInverseOffer(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const Option<UnavailableResources>& unavailableResources,
const Option<mesos::allocator::InverseOfferStatus>& status,
const Option<Filters>& filters);
process::Future<
hashmap<SlaveID,
hashmap<FrameworkID, mesos::allocator::InverseOfferStatus>>>
getInverseOfferStatuses();
void recoverResources(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
const Resources& resources,
const Option<Filters>& filters);
void suppressOffers(
const FrameworkID& frameworkId,
const std::set<std::string>& roles);
void reviveOffers(
const FrameworkID& frameworkId,
const std::set<std::string>& roles);
void setQuota(
const std::string& role,
const Quota& quota);
void removeQuota(
const std::string& role);
void updateWeights(
const std::vector<WeightInfo>& weightInfos);
protected:
// Useful typedefs for dispatch/delay/defer to self()/this.
typedef HierarchicalAllocatorProcess Self;
typedef HierarchicalAllocatorProcess This;
// Idempotent helpers for pausing and resuming allocation.
void pause();
void resume();
// Allocate any allocatable resources from all known agents.
process::Future<Nothing> allocate();
// Allocate resources from the specified agent.
process::Future<Nothing> allocate(const SlaveID& slaveId);
// Allocate resources from the specified agents. The allocation
// is deferred and batched with other allocation requests.
process::Future<Nothing> allocate(const hashset<SlaveID>& slaveIds);
// Method that performs allocation work.
Nothing _allocate();
// Helper for `_allocate()` that allocates resources for offers.
void __allocate();
// Helper for `_allocate()` that deallocates resources for inverse offers.
void deallocate();
// Remove an offer filter for the specified role of the framework.
void expire(
const FrameworkID& frameworkId,
const std::string& role,
const SlaveID& slaveId,
OfferFilter* offerFilter);
void _expire(
const FrameworkID& frameworkId,
const std::string& role,
const SlaveID& slaveId,
OfferFilter* offerFilter);
// Remove an inverse offer filter for the specified framework.
void expire(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
InverseOfferFilter* inverseOfferFilter);
// Checks whether the slave is whitelisted.
bool isWhitelisted(const SlaveID& slaveId) const;
// Returns true if there is a resource offer filter for the
// specified role of this framework on this slave.
bool isFiltered(
const FrameworkID& frameworkId,
const std::string& role,
const SlaveID& slaveId,
const Resources& resources) const;
// Returns true if there is an inverse offer filter for this framework
// on this slave.
bool isFiltered(
const FrameworkID& frameworkID,
const SlaveID& slaveID) const;
bool allocatable(const Resources& resources);
bool initialized;
bool paused;
// Recovery data.
Option<int> expectedAgentCount;
Duration allocationInterval;
lambda::function<
void(const FrameworkID&,
const hashmap<std::string, hashmap<SlaveID, Resources>>&)>
offerCallback;
lambda::function<
void(const FrameworkID&,
const hashmap<SlaveID, UnavailableResources>&)>
inverseOfferCallback;
friend Metrics;
Metrics metrics;
struct Framework
{
explicit Framework(
const FrameworkInfo& frameworkInfo,
const std::set<std::string>& _suppressedRoles);
std::set<std::string> roles;
std::set<std::string> suppressedRoles;
protobuf::framework::Capabilities capabilities;
// Active offer and inverse offer filters for the framework.
// Offer filters are tied to the role the filtered resources
// were allocated to.
hashmap<std::string, hashmap<SlaveID, hashset<OfferFilter*>>> offerFilters;
hashmap<SlaveID, hashset<InverseOfferFilter*>> inverseOfferFilters;
};
double _event_queue_dispatches()
{
return static_cast<double>(eventCount<process::DispatchEvent>());
}
double _resources_total(
const std::string& resource);
double _resources_offered_or_allocated(
const std::string& resource);
double _quota_allocated(
const std::string& role,
const std::string& resource);
double _offer_filters_active(
const std::string& role);
hashmap<FrameworkID, Framework> frameworks;
class Slave
{
public:
Slave(
const std::string& _hostname,
const protobuf::slave::Capabilities& _capabilities,
bool _activated,
const Resources& _total,
const Resources& _allocated)
: hostname(_hostname),
capabilities(_capabilities),
activated(_activated),
total(_total),
allocated(_allocated)
{
// In order to subtract from the total,
// we strip the allocation information.
Resources allocated_ = allocated;
allocated_.unallocate();
available = total - allocated_;
}
Resources getTotal() const { return total; }
Resources getAllocated() const { return allocated; }
Resources getAvailable() const { return available; }
void updateTotal(const Resources& newTotal) {
total = newTotal;
updateAvailable();
}
void allocate(const Resources& toAllocate)
{
allocated += toAllocate;
updateAvailable();
}
void unallocate(const Resources& toUnallocate)
{
allocated -= toUnallocate;
updateAvailable();
}
std::string hostname;
protobuf::slave::Capabilities capabilities;
bool activated; // Whether to offer resources.
Option<DomainInfo> domain;
// Represents a scheduled unavailability due to maintenance for a specific
// slave, and the responses from frameworks as to whether they will be able
// to gracefully handle this unavailability.
//
// NOTE: We currently implement maintenance in the allocator to be able to
// leverage state and features such as the FrameworkSorter and OfferFilter.
struct Maintenance
{
Maintenance(const Unavailability& _unavailability)
: unavailability(_unavailability) {}
// The start time and optional duration of the event.
Unavailability unavailability;
// A mapping of frameworks to the inverse offer status associated with
// this unavailability.
//
// NOTE: We currently lose this information during a master fail over
// since it is not persisted or replicated. This is ok as the new master's
// allocator will send out new inverse offers and re-collect the
// information. This is similar to all the outstanding offers from an old
// master being invalidated, and new offers being sent out.
hashmap<FrameworkID, mesos::allocator::InverseOfferStatus> statuses;
// Represents the "unit of accounting" for maintenance. When a
// `FrameworkID` is present in the hashset it means an inverse offer has
// been sent out. When it is not present it means no offer is currently
// outstanding.
hashset<FrameworkID> offersOutstanding;
};
// When the `maintenance` is set the slave is scheduled to be unavailable at
// a given point in time, for an optional duration. This information is used
// to send out `InverseOffers`.
Option<Maintenance> maintenance;
private:
void updateAvailable() {
// In order to subtract from the total,
// we strip the allocation information.
Resources allocated_ = allocated;
allocated_.unallocate();
available = total - allocated_;
}
// Total amount of regular *and* oversubscribed resources.
Resources total;
// Regular *and* oversubscribed resources that are allocated.
//
// NOTE: We maintain multiple copies of each shared resource allocated
// to a slave, where the number of copies represents the number of times
// this shared resource has been allocated to (and has not been recovered
// from) a specific framework.
//
// NOTE: We keep track of the slave's allocated resources despite
// having that information in sorters. This is because the
// information in sorters is not accurate if some framework
// hasn't reregistered. See MESOS-2919 for details.
Resources allocated;
// We track the total and allocated resources on the slave, the
// available resources are computed as follows:
//
// available = total - allocated
//
// Note that it's possible for the slave to be over-allocated!
// In this case, allocated > total.
Resources available;
};
hashmap<SlaveID, Slave> slaves;
// A set of agents that are kept as allocation candidates. Events
// may add or remove candidates to the set. When an allocation is
// processed, the set of candidates is cleared.
hashset<SlaveID> allocationCandidates;
// Future for the dispatched allocation that becomes
// ready after the allocation run is complete.
Option<process::Future<Nothing>> allocation;
// We track information about roles that we're aware of in the system.
// Specifically, we keep track of the roles when a framework subscribes to
// the role, and/or when there are resources allocated to the role
// (e.g. some tasks and/or executors are consuming resources under the role).
hashmap<std::string, hashset<FrameworkID>> roles;
// Configured quota for each role, if any. Setting quota for a role
// changes the order that the role's frameworks are offered
// resources. Quota comes before fair share, hence setting quota moves
// the role's frameworks towards the front of the allocation queue.
//
// NOTE: We currently associate quota with roles, but this may
// change in the future.
hashmap<std::string, Quota> quotas;
// Aggregated resource reservations on all agents tied to a
// particular role, if any. These are stripped scalar quantities
// that contain no meta-data. Used for accounting resource
// reservations for quota limit.
//
// Only roles with non-empty reservations will be stored in the map.
hashmap<std::string, Resources> reservationScalarQuantities;
// Slaves to send offers for.
Option<hashset<std::string>> whitelist;
// Resources (by name) that will be excluded from a role's fair share.
Option<std::set<std::string>> fairnessExcludeResourceNames;
// Filter GPU resources based on the `GPU_RESOURCES` framework capability.
bool filterGpuResources;
// The master's domain, if any.
Option<DomainInfo> domain;
// The minimum allocatable resources, if any.
Option<std::vector<mesos::internal::ResourceQuantities>>
minAllocatableResources;
// There are two stages of allocation. During the first stage resources
// are allocated only to frameworks in roles with quota set. During the
// second stage remaining resources that would not be required to satisfy
// un-allocated quota are then allocated to all frameworks.
//
// Each stage comprises two levels of sorting, hence "hierarchical".
// Level 1 sorts across roles:
// Currently, only the allocated portion of the reserved resources are
// accounted for fairness calculation.
//
// TODO(mpark): Reserved resources should be accounted for fairness
// calculation whether they are allocated or not, since they model a long or
// forever running task. That is, the effect of reserving resources is
// equivalent to launching a task in that the resources that make up the
// reservation are not available to other roles as non-revocable.
//
// Level 2 sorts across frameworks within a particular role:
// Reserved resources at this level are, and should be accounted for
// fairness calculation only if they are allocated. This is because
// reserved resources are fairly shared across the frameworks in the role.
//
// The allocator relies on `Sorter`s to employ a particular sorting
// algorithm. Each level has its own sorter and hence may have different
// fairness calculations.
//
// NOTE: The hierarchical allocator considers revocable resources as
// regular resources when doing fairness calculations.
//
// TODO(vinod): Consider using a different fairness algorithm for
// revocable resources.
// A sorter for active roles. This sorter determines the order in which
// roles are allocated resources during Level 1 of the second stage.
process::Owned<Sorter> roleSorter;
// A dedicated sorter for roles for which quota is set. This sorter
// determines the order in which quota'ed roles are allocated resources
// during Level 1 of the first stage. Quota'ed roles have resources
// allocated up to their alloted quota (the first stage) prior to
// non-quota'ed roles (the second stage).
//
// NOTE: A role appears in `quotaRoleSorter` if it has a quota (even if
// no frameworks are currently registered in that role). In contrast,
// `roleSorter` only contains entries for roles with one or more
// registered frameworks.
//
// NOTE: We do not include revocable resources in the quota role sorter,
// because the quota role sorter's job is to perform fair sharing between
// the quota roles as it pertains to their level of quota satisfaction.
// Since revocable resources do not increase a role's level of satisfaction
// toward its quota, we choose to exclude them from the quota role sorter.
process::Owned<Sorter> quotaRoleSorter;
// A collection of sorters, one per active role. Each sorter determines
// the order in which frameworks that belong to the same role are allocated
// resources inside the role's share. These sorters are used during Level 2
// for both the first and the second stages.
hashmap<std::string, process::Owned<Sorter>> frameworkSorters;
// Factory function for framework sorters.
const std::function<Sorter*()> frameworkSorterFactory;
private:
bool isFrameworkTrackedUnderRole(
const FrameworkID& frameworkId,
const std::string& role) const;
void trackFrameworkUnderRole(
const FrameworkID& frameworkId,
const std::string& role);
void untrackFrameworkUnderRole(
const FrameworkID& frameworkId,
const std::string& role);
// `trackReservations` and `untrackReservations` are helpers
// to track role resource reservations. We need to keep
// track of reservations to enforce role quota limit
// in the presence of unallocated reservations. See MESOS-4527.
//
// TODO(mzhu): Ideally, we want these helpers to instead track the
// reservations as *allocated* in the sorters even when the
// reservations have not been allocated yet. This will help to:
//
// (1) Solve the fairness issue when roles with unallocated
// reservations may game the allocator (See MESOS-8299).
//
// (2) Simplify the quota enforcement logic -- the allocator
// would no longer need to track reservations separately.
void trackReservations(
const hashmap<std::string, Resources>& reservations);
void untrackReservations(
const hashmap<std::string, Resources>& reservations);
// Helper to update the agent's total resources maintained in the allocator
// and the role and quota sorters (whose total resources match the agent's
// total resources). Returns true iff the stored agent total was changed.
bool updateSlaveTotal(const SlaveID& slaveId, const Resources& total);
// Helper that returns true if the given agent is located in a
// different region than the master. This can only be the case if
// the agent and the master are both configured with a fault domain.
bool isRemoteSlave(const Slave& slave) const;
// Helper to track allocated resources on an agent.
void trackAllocatedResources(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const Resources& allocated);
// Helper to untrack resources that are no longer allocated on an agent.
void untrackAllocatedResources(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const Resources& allocated);
// Helper that removes all existing offer filters for the given slave
// id.
void removeFilters(const SlaveID& slaveId);
};
} // namespace internal {
// We map the templatized version of the `HierarchicalAllocatorProcess` to one
// that relies on sorter factories in the internal namespace. This allows us
// to keep the implementation of the allocator in the implementation file.
template <
typename RoleSorter,
typename FrameworkSorter,
typename QuotaRoleSorter>
class HierarchicalAllocatorProcess
: public internal::HierarchicalAllocatorProcess
{
public:
HierarchicalAllocatorProcess()
: ProcessBase(process::ID::generate("hierarchical-allocator")),
internal::HierarchicalAllocatorProcess(
[this]() -> Sorter* {
return new RoleSorter(this->self(), "allocator/mesos/roles/");
},
[]() -> Sorter* { return new FrameworkSorter(); },
[]() -> Sorter* { return new QuotaRoleSorter(); }) {}
};
} // namespace allocator {
} // namespace master {
} // namespace internal {
} // namespace mesos {
#endif // __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__