| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #ifndef __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__ |
| #define __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__ |
| |
| #include <set> |
| #include <string> |
| |
| #include <mesos/mesos.hpp> |
| |
| #include <process/future.hpp> |
| #include <process/id.hpp> |
| #include <process/owned.hpp> |
| |
| #include <stout/duration.hpp> |
| #include <stout/hashmap.hpp> |
| #include <stout/hashset.hpp> |
| #include <stout/lambda.hpp> |
| #include <stout/option.hpp> |
| |
| #include "common/protobuf_utils.hpp" |
| |
| #include "master/allocator/mesos/allocator.hpp" |
| #include "master/allocator/mesos/metrics.hpp" |
| |
| #include "master/allocator/sorter/drf/sorter.hpp" |
| #include "master/allocator/sorter/random/sorter.hpp" |
| |
| #include "master/constants.hpp" |
| |
| namespace mesos { |
| namespace internal { |
| namespace master { |
| namespace allocator { |
| |
| // We forward declare the hierarchical allocator process so that we |
| // can typedef an instantiation of it with DRF sorters. |
| template < |
| typename RoleSorter, |
| typename FrameworkSorter, |
| typename QuotaRoleSorter> |
| class HierarchicalAllocatorProcess; |
| |
| typedef HierarchicalAllocatorProcess<DRFSorter, DRFSorter, DRFSorter> |
| HierarchicalDRFAllocatorProcess; |
| |
| typedef MesosAllocator<HierarchicalDRFAllocatorProcess> |
| HierarchicalDRFAllocator; |
| |
| typedef HierarchicalAllocatorProcess<RandomSorter, RandomSorter, RandomSorter> |
| HierarchicalRandomAllocatorProcess; |
| |
| typedef MesosAllocator<HierarchicalRandomAllocatorProcess> |
| HierarchicalRandomAllocator; |
| |
| |
| namespace internal { |
| |
| // Forward declarations. |
| class OfferFilter; |
| class InverseOfferFilter; |
| |
| |
| // Implements the basic allocator algorithm - first pick a role by |
| // some criteria, then pick one of their frameworks to allocate to. |
| class HierarchicalAllocatorProcess : public MesosAllocatorProcess |
| { |
| public: |
| HierarchicalAllocatorProcess( |
| const std::function<Sorter*()>& roleSorterFactory, |
| const std::function<Sorter*()>& _frameworkSorterFactory, |
| const std::function<Sorter*()>& quotaRoleSorterFactory) |
| : initialized(false), |
| paused(true), |
| metrics(*this), |
| roleSorter(roleSorterFactory()), |
| quotaRoleSorter(quotaRoleSorterFactory()), |
| frameworkSorterFactory(_frameworkSorterFactory) {} |
| |
| virtual ~HierarchicalAllocatorProcess() {} |
| |
| process::PID<HierarchicalAllocatorProcess> self() const |
| { |
| return process::PID<Self>(this); |
| } |
| |
| void initialize( |
| const Duration& allocationInterval, |
| const lambda::function< |
| void(const FrameworkID&, |
| const hashmap<std::string, hashmap<SlaveID, Resources>>&)>& |
| offerCallback, |
| const lambda::function< |
| void(const FrameworkID&, |
| const hashmap<SlaveID, UnavailableResources>&)>& |
| inverseOfferCallback, |
| const Option<std::set<std::string>>& |
| fairnessExcludeResourceNames = None(), |
| bool filterGpuResources = true, |
| const Option<DomainInfo>& domain = None(), |
| const Option<std::vector<mesos::internal::ResourceQuantities>>& |
| minAllocatableResources = None()); |
| |
| void recover( |
| const int _expectedAgentCount, |
| const hashmap<std::string, Quota>& quotas); |
| |
| void addFramework( |
| const FrameworkID& frameworkId, |
| const FrameworkInfo& frameworkInfo, |
| const hashmap<SlaveID, Resources>& used, |
| bool active, |
| const std::set<std::string>& suppressedRoles); |
| |
| void removeFramework( |
| const FrameworkID& frameworkId); |
| |
| void activateFramework( |
| const FrameworkID& frameworkId); |
| |
| void deactivateFramework( |
| const FrameworkID& frameworkId); |
| |
| void updateFramework( |
| const FrameworkID& frameworkId, |
| const FrameworkInfo& frameworkInfo, |
| const std::set<std::string>& suppressedRoles); |
| |
| void addSlave( |
| const SlaveID& slaveId, |
| const SlaveInfo& slaveInfo, |
| const std::vector<SlaveInfo::Capability>& capabilities, |
| const Option<Unavailability>& unavailability, |
| const Resources& total, |
| const hashmap<FrameworkID, Resources>& used); |
| |
| void removeSlave( |
| const SlaveID& slaveId); |
| |
| void updateSlave( |
| const SlaveID& slave, |
| const Option<Resources>& total = None(), |
| const Option<std::vector<SlaveInfo::Capability>>& capabilities = None()); |
| |
| void deactivateSlave( |
| const SlaveID& slaveId); |
| |
| void activateSlave( |
| const SlaveID& slaveId); |
| |
| void updateWhitelist( |
| const Option<hashset<std::string>>& whitelist); |
| |
| void requestResources( |
| const FrameworkID& frameworkId, |
| const std::vector<Request>& requests); |
| |
| void updateAllocation( |
| const FrameworkID& frameworkId, |
| const SlaveID& slaveId, |
| const Resources& offeredResources, |
| const std::vector<Offer::Operation>& operations); |
| |
| process::Future<Nothing> updateAvailable( |
| const SlaveID& slaveId, |
| const std::vector<Offer::Operation>& operations); |
| |
| void updateUnavailability( |
| const SlaveID& slaveId, |
| const Option<Unavailability>& unavailability); |
| |
| void updateInverseOffer( |
| const SlaveID& slaveId, |
| const FrameworkID& frameworkId, |
| const Option<UnavailableResources>& unavailableResources, |
| const Option<mesos::allocator::InverseOfferStatus>& status, |
| const Option<Filters>& filters); |
| |
| process::Future< |
| hashmap<SlaveID, |
| hashmap<FrameworkID, mesos::allocator::InverseOfferStatus>>> |
| getInverseOfferStatuses(); |
| |
| void recoverResources( |
| const FrameworkID& frameworkId, |
| const SlaveID& slaveId, |
| const Resources& resources, |
| const Option<Filters>& filters); |
| |
| void suppressOffers( |
| const FrameworkID& frameworkId, |
| const std::set<std::string>& roles); |
| |
| void reviveOffers( |
| const FrameworkID& frameworkId, |
| const std::set<std::string>& roles); |
| |
| void setQuota( |
| const std::string& role, |
| const Quota& quota); |
| |
| void removeQuota( |
| const std::string& role); |
| |
| void updateWeights( |
| const std::vector<WeightInfo>& weightInfos); |
| |
| protected: |
| // Useful typedefs for dispatch/delay/defer to self()/this. |
| typedef HierarchicalAllocatorProcess Self; |
| typedef HierarchicalAllocatorProcess This; |
| |
| // Idempotent helpers for pausing and resuming allocation. |
| void pause(); |
| void resume(); |
| |
| // Allocate any allocatable resources from all known agents. |
| process::Future<Nothing> allocate(); |
| |
| // Allocate resources from the specified agent. |
| process::Future<Nothing> allocate(const SlaveID& slaveId); |
| |
| // Allocate resources from the specified agents. The allocation |
| // is deferred and batched with other allocation requests. |
| process::Future<Nothing> allocate(const hashset<SlaveID>& slaveIds); |
| |
| // Method that performs allocation work. |
| Nothing _allocate(); |
| |
| // Helper for `_allocate()` that allocates resources for offers. |
| void __allocate(); |
| |
| // Helper for `_allocate()` that deallocates resources for inverse offers. |
| void deallocate(); |
| |
| // Remove an offer filter for the specified role of the framework. |
| void expire( |
| const FrameworkID& frameworkId, |
| const std::string& role, |
| const SlaveID& slaveId, |
| OfferFilter* offerFilter); |
| |
| void _expire( |
| const FrameworkID& frameworkId, |
| const std::string& role, |
| const SlaveID& slaveId, |
| OfferFilter* offerFilter); |
| |
| // Remove an inverse offer filter for the specified framework. |
| void expire( |
| const FrameworkID& frameworkId, |
| const SlaveID& slaveId, |
| InverseOfferFilter* inverseOfferFilter); |
| |
| // Checks whether the slave is whitelisted. |
| bool isWhitelisted(const SlaveID& slaveId) const; |
| |
| // Returns true if there is a resource offer filter for the |
| // specified role of this framework on this slave. |
| bool isFiltered( |
| const FrameworkID& frameworkId, |
| const std::string& role, |
| const SlaveID& slaveId, |
| const Resources& resources) const; |
| |
| // Returns true if there is an inverse offer filter for this framework |
| // on this slave. |
| bool isFiltered( |
| const FrameworkID& frameworkID, |
| const SlaveID& slaveID) const; |
| |
| bool allocatable(const Resources& resources); |
| |
| bool initialized; |
| bool paused; |
| |
| // Recovery data. |
| Option<int> expectedAgentCount; |
| |
| Duration allocationInterval; |
| |
| lambda::function< |
| void(const FrameworkID&, |
| const hashmap<std::string, hashmap<SlaveID, Resources>>&)> |
| offerCallback; |
| |
| lambda::function< |
| void(const FrameworkID&, |
| const hashmap<SlaveID, UnavailableResources>&)> |
| inverseOfferCallback; |
| |
| friend Metrics; |
| Metrics metrics; |
| |
| struct Framework |
| { |
| explicit Framework( |
| const FrameworkInfo& frameworkInfo, |
| const std::set<std::string>& _suppressedRoles); |
| |
| std::set<std::string> roles; |
| |
| std::set<std::string> suppressedRoles; |
| |
| protobuf::framework::Capabilities capabilities; |
| |
| // Active offer and inverse offer filters for the framework. |
| // Offer filters are tied to the role the filtered resources |
| // were allocated to. |
| hashmap<std::string, hashmap<SlaveID, hashset<OfferFilter*>>> offerFilters; |
| hashmap<SlaveID, hashset<InverseOfferFilter*>> inverseOfferFilters; |
| }; |
| |
| double _event_queue_dispatches() |
| { |
| return static_cast<double>(eventCount<process::DispatchEvent>()); |
| } |
| |
| double _resources_total( |
| const std::string& resource); |
| |
| double _resources_offered_or_allocated( |
| const std::string& resource); |
| |
| double _quota_allocated( |
| const std::string& role, |
| const std::string& resource); |
| |
| double _offer_filters_active( |
| const std::string& role); |
| |
| hashmap<FrameworkID, Framework> frameworks; |
| |
| class Slave |
| { |
| public: |
| Slave( |
| const std::string& _hostname, |
| const protobuf::slave::Capabilities& _capabilities, |
| bool _activated, |
| const Resources& _total, |
| const Resources& _allocated) |
| : hostname(_hostname), |
| capabilities(_capabilities), |
| activated(_activated), |
| total(_total), |
| allocated(_allocated) |
| { |
| // In order to subtract from the total, |
| // we strip the allocation information. |
| Resources allocated_ = allocated; |
| allocated_.unallocate(); |
| |
| available = total - allocated_; |
| } |
| |
| Resources getTotal() const { return total; } |
| |
| Resources getAllocated() const { return allocated; } |
| |
| Resources getAvailable() const { return available; } |
| |
| void updateTotal(const Resources& newTotal) { |
| total = newTotal; |
| |
| updateAvailable(); |
| } |
| |
| void allocate(const Resources& toAllocate) |
| { |
| allocated += toAllocate; |
| |
| updateAvailable(); |
| } |
| |
| void unallocate(const Resources& toUnallocate) |
| { |
| allocated -= toUnallocate; |
| |
| updateAvailable(); |
| } |
| |
| std::string hostname; |
| |
| protobuf::slave::Capabilities capabilities; |
| |
| bool activated; // Whether to offer resources. |
| |
| Option<DomainInfo> domain; |
| |
| // Represents a scheduled unavailability due to maintenance for a specific |
| // slave, and the responses from frameworks as to whether they will be able |
| // to gracefully handle this unavailability. |
| // |
| // NOTE: We currently implement maintenance in the allocator to be able to |
| // leverage state and features such as the FrameworkSorter and OfferFilter. |
| struct Maintenance |
| { |
| Maintenance(const Unavailability& _unavailability) |
| : unavailability(_unavailability) {} |
| |
| // The start time and optional duration of the event. |
| Unavailability unavailability; |
| |
| // A mapping of frameworks to the inverse offer status associated with |
| // this unavailability. |
| // |
| // NOTE: We currently lose this information during a master fail over |
| // since it is not persisted or replicated. This is ok as the new master's |
| // allocator will send out new inverse offers and re-collect the |
| // information. This is similar to all the outstanding offers from an old |
| // master being invalidated, and new offers being sent out. |
| hashmap<FrameworkID, mesos::allocator::InverseOfferStatus> statuses; |
| |
| // Represents the "unit of accounting" for maintenance. When a |
| // `FrameworkID` is present in the hashset it means an inverse offer has |
| // been sent out. When it is not present it means no offer is currently |
| // outstanding. |
| hashset<FrameworkID> offersOutstanding; |
| }; |
| |
| // When the `maintenance` is set the slave is scheduled to be unavailable at |
| // a given point in time, for an optional duration. This information is used |
| // to send out `InverseOffers`. |
| Option<Maintenance> maintenance; |
| |
| private: |
| void updateAvailable() { |
| // In order to subtract from the total, |
| // we strip the allocation information. |
| Resources allocated_ = allocated; |
| allocated_.unallocate(); |
| |
| available = total - allocated_; |
| } |
| |
| // Total amount of regular *and* oversubscribed resources. |
| Resources total; |
| |
| // Regular *and* oversubscribed resources that are allocated. |
| // |
| // NOTE: We maintain multiple copies of each shared resource allocated |
| // to a slave, where the number of copies represents the number of times |
| // this shared resource has been allocated to (and has not been recovered |
| // from) a specific framework. |
| // |
| // NOTE: We keep track of the slave's allocated resources despite |
| // having that information in sorters. This is because the |
| // information in sorters is not accurate if some framework |
| // hasn't reregistered. See MESOS-2919 for details. |
| Resources allocated; |
| |
| // We track the total and allocated resources on the slave, the |
| // available resources are computed as follows: |
| // |
| // available = total - allocated |
| // |
| // Note that it's possible for the slave to be over-allocated! |
| // In this case, allocated > total. |
| Resources available; |
| }; |
| |
| hashmap<SlaveID, Slave> slaves; |
| |
| // A set of agents that are kept as allocation candidates. Events |
| // may add or remove candidates to the set. When an allocation is |
| // processed, the set of candidates is cleared. |
| hashset<SlaveID> allocationCandidates; |
| |
| // Future for the dispatched allocation that becomes |
| // ready after the allocation run is complete. |
| Option<process::Future<Nothing>> allocation; |
| |
| // We track information about roles that we're aware of in the system. |
| // Specifically, we keep track of the roles when a framework subscribes to |
| // the role, and/or when there are resources allocated to the role |
| // (e.g. some tasks and/or executors are consuming resources under the role). |
| hashmap<std::string, hashset<FrameworkID>> roles; |
| |
| // Configured quota for each role, if any. Setting quota for a role |
| // changes the order that the role's frameworks are offered |
| // resources. Quota comes before fair share, hence setting quota moves |
| // the role's frameworks towards the front of the allocation queue. |
| // |
| // NOTE: We currently associate quota with roles, but this may |
| // change in the future. |
| hashmap<std::string, Quota> quotas; |
| |
| // Aggregated resource reservations on all agents tied to a |
| // particular role, if any. These are stripped scalar quantities |
| // that contain no meta-data. Used for accounting resource |
| // reservations for quota limit. |
| // |
| // Only roles with non-empty reservations will be stored in the map. |
| hashmap<std::string, Resources> reservationScalarQuantities; |
| |
| // Slaves to send offers for. |
| Option<hashset<std::string>> whitelist; |
| |
| // Resources (by name) that will be excluded from a role's fair share. |
| Option<std::set<std::string>> fairnessExcludeResourceNames; |
| |
| // Filter GPU resources based on the `GPU_RESOURCES` framework capability. |
| bool filterGpuResources; |
| |
| // The master's domain, if any. |
| Option<DomainInfo> domain; |
| |
| // The minimum allocatable resources, if any. |
| Option<std::vector<mesos::internal::ResourceQuantities>> |
| minAllocatableResources; |
| |
| // There are two stages of allocation. During the first stage resources |
| // are allocated only to frameworks in roles with quota set. During the |
| // second stage remaining resources that would not be required to satisfy |
| // un-allocated quota are then allocated to all frameworks. |
| // |
| // Each stage comprises two levels of sorting, hence "hierarchical". |
| // Level 1 sorts across roles: |
| // Currently, only the allocated portion of the reserved resources are |
| // accounted for fairness calculation. |
| // |
| // TODO(mpark): Reserved resources should be accounted for fairness |
| // calculation whether they are allocated or not, since they model a long or |
| // forever running task. That is, the effect of reserving resources is |
| // equivalent to launching a task in that the resources that make up the |
| // reservation are not available to other roles as non-revocable. |
| // |
| // Level 2 sorts across frameworks within a particular role: |
| // Reserved resources at this level are, and should be accounted for |
| // fairness calculation only if they are allocated. This is because |
| // reserved resources are fairly shared across the frameworks in the role. |
| // |
| // The allocator relies on `Sorter`s to employ a particular sorting |
| // algorithm. Each level has its own sorter and hence may have different |
| // fairness calculations. |
| // |
| // NOTE: The hierarchical allocator considers revocable resources as |
| // regular resources when doing fairness calculations. |
| // |
| // TODO(vinod): Consider using a different fairness algorithm for |
| // revocable resources. |
| |
| // A sorter for active roles. This sorter determines the order in which |
| // roles are allocated resources during Level 1 of the second stage. |
| process::Owned<Sorter> roleSorter; |
| |
| // A dedicated sorter for roles for which quota is set. This sorter |
| // determines the order in which quota'ed roles are allocated resources |
| // during Level 1 of the first stage. Quota'ed roles have resources |
| // allocated up to their alloted quota (the first stage) prior to |
| // non-quota'ed roles (the second stage). |
| // |
| // NOTE: A role appears in `quotaRoleSorter` if it has a quota (even if |
| // no frameworks are currently registered in that role). In contrast, |
| // `roleSorter` only contains entries for roles with one or more |
| // registered frameworks. |
| // |
| // NOTE: We do not include revocable resources in the quota role sorter, |
| // because the quota role sorter's job is to perform fair sharing between |
| // the quota roles as it pertains to their level of quota satisfaction. |
| // Since revocable resources do not increase a role's level of satisfaction |
| // toward its quota, we choose to exclude them from the quota role sorter. |
| process::Owned<Sorter> quotaRoleSorter; |
| |
| // A collection of sorters, one per active role. Each sorter determines |
| // the order in which frameworks that belong to the same role are allocated |
| // resources inside the role's share. These sorters are used during Level 2 |
| // for both the first and the second stages. |
| hashmap<std::string, process::Owned<Sorter>> frameworkSorters; |
| |
| // Factory function for framework sorters. |
| const std::function<Sorter*()> frameworkSorterFactory; |
| |
| private: |
| bool isFrameworkTrackedUnderRole( |
| const FrameworkID& frameworkId, |
| const std::string& role) const; |
| |
| void trackFrameworkUnderRole( |
| const FrameworkID& frameworkId, |
| const std::string& role); |
| |
| void untrackFrameworkUnderRole( |
| const FrameworkID& frameworkId, |
| const std::string& role); |
| |
| // `trackReservations` and `untrackReservations` are helpers |
| // to track role resource reservations. We need to keep |
| // track of reservations to enforce role quota limit |
| // in the presence of unallocated reservations. See MESOS-4527. |
| // |
| // TODO(mzhu): Ideally, we want these helpers to instead track the |
| // reservations as *allocated* in the sorters even when the |
| // reservations have not been allocated yet. This will help to: |
| // |
| // (1) Solve the fairness issue when roles with unallocated |
| // reservations may game the allocator (See MESOS-8299). |
| // |
| // (2) Simplify the quota enforcement logic -- the allocator |
| // would no longer need to track reservations separately. |
| void trackReservations( |
| const hashmap<std::string, Resources>& reservations); |
| |
| void untrackReservations( |
| const hashmap<std::string, Resources>& reservations); |
| |
| // Helper to update the agent's total resources maintained in the allocator |
| // and the role and quota sorters (whose total resources match the agent's |
| // total resources). Returns true iff the stored agent total was changed. |
| bool updateSlaveTotal(const SlaveID& slaveId, const Resources& total); |
| |
| // Helper that returns true if the given agent is located in a |
| // different region than the master. This can only be the case if |
| // the agent and the master are both configured with a fault domain. |
| bool isRemoteSlave(const Slave& slave) const; |
| |
| // Helper to track allocated resources on an agent. |
| void trackAllocatedResources( |
| const SlaveID& slaveId, |
| const FrameworkID& frameworkId, |
| const Resources& allocated); |
| |
| // Helper to untrack resources that are no longer allocated on an agent. |
| void untrackAllocatedResources( |
| const SlaveID& slaveId, |
| const FrameworkID& frameworkId, |
| const Resources& allocated); |
| |
| // Helper that removes all existing offer filters for the given slave |
| // id. |
| void removeFilters(const SlaveID& slaveId); |
| }; |
| |
| |
| } // namespace internal { |
| |
| |
| // We map the templatized version of the `HierarchicalAllocatorProcess` to one |
| // that relies on sorter factories in the internal namespace. This allows us |
| // to keep the implementation of the allocator in the implementation file. |
| template < |
| typename RoleSorter, |
| typename FrameworkSorter, |
| typename QuotaRoleSorter> |
| class HierarchicalAllocatorProcess |
| : public internal::HierarchicalAllocatorProcess |
| { |
| public: |
| HierarchicalAllocatorProcess() |
| : ProcessBase(process::ID::generate("hierarchical-allocator")), |
| internal::HierarchicalAllocatorProcess( |
| [this]() -> Sorter* { |
| return new RoleSorter(this->self(), "allocator/mesos/roles/"); |
| }, |
| []() -> Sorter* { return new FrameworkSorter(); }, |
| []() -> Sorter* { return new QuotaRoleSorter(); }) {} |
| }; |
| |
| } // namespace allocator { |
| } // namespace master { |
| } // namespace internal { |
| } // namespace mesos { |
| |
| #endif // __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__ |