| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #ifndef __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__ |
| #define __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__ |
| |
| #include <memory> |
| #include <set> |
| #include <string> |
| |
| #include <mesos/mesos.hpp> |
| |
| #include <process/future.hpp> |
| #include <process/id.hpp> |
| #include <process/owned.hpp> |
| |
| #include <stout/boundedhashmap.hpp> |
| #include <stout/duration.hpp> |
| #include <stout/hashmap.hpp> |
| #include <stout/hashset.hpp> |
| #include <stout/lambda.hpp> |
| #include <stout/option.hpp> |
| |
| #include "common/protobuf_utils.hpp" |
| |
| #include "master/allocator/mesos/allocator.hpp" |
| #include "master/allocator/mesos/metrics.hpp" |
| |
| #include "master/allocator/mesos/sorter/drf/sorter.hpp" |
| #include "master/allocator/mesos/sorter/random/sorter.hpp" |
| |
| #include "master/constants.hpp" |
| |
| namespace mesos { |
| namespace internal { |
| namespace master { |
| namespace allocator { |
| |
| // We forward declare the hierarchical allocator process so that we |
| // can typedef an instantiation of it with DRF sorters. |
| template < |
| typename RoleSorter, |
| typename FrameworkSorter> |
| class HierarchicalAllocatorProcess; |
| |
| typedef HierarchicalAllocatorProcess<DRFSorter, DRFSorter> |
| HierarchicalDRFAllocatorProcess; |
| |
| typedef MesosAllocator<HierarchicalDRFAllocatorProcess> |
| HierarchicalDRFAllocator; |
| |
| typedef HierarchicalAllocatorProcess<RandomSorter, RandomSorter> |
| HierarchicalRandomAllocatorProcess; |
| |
| typedef MesosAllocator<HierarchicalRandomAllocatorProcess> |
| HierarchicalRandomAllocator; |
| |
| |
| namespace internal { |
| |
| // Forward declarations. |
| class OfferFilter; |
| class InverseOfferFilter; |
| class RoleTree; |
| |
| |
| struct Framework |
| { |
| Framework( |
| const FrameworkInfo& frameworkInfo, |
| const std::set<std::string>& suppressedRoles, |
| bool active, |
| bool publishPerFrameworkMetrics); |
| |
| const FrameworkID frameworkId; |
| |
| std::set<std::string> roles; |
| |
| std::set<std::string> suppressedRoles; |
| |
| protobuf::framework::Capabilities capabilities; |
| |
| // Offer filters are tied to the role the filtered |
| // resources were offered to. |
| hashmap<std::string, hashmap<SlaveID, hashset<std::shared_ptr<OfferFilter>>>> |
| offerFilters; |
| |
| hashmap<SlaveID, hashset<std::shared_ptr<InverseOfferFilter>>> |
| inverseOfferFilters; |
| |
| bool active; |
| |
| bool publishPerFrameworkMetrics; |
| |
| process::Owned<FrameworkMetrics> metrics; |
| |
| // TODO(bbannier): Consider documenting examples on how to use this setting. |
| hashmap<std::string, std::vector<ResourceQuantities>> minAllocatableResources; |
| }; |
| |
| |
| // Helper for tracking cross-agent scalar resource totals. |
| // Needed because directly summing Resources across agents has |
| // prohibitively expensive time complexity: O(N^2) vs the number of agents, |
| // and also violates the convention that Resources belonging to different agents |
| // should not be added. |
| class ScalarResourceTotals |
| { |
| public: |
| // These methods implicitly filter out non-scalars from the inputs, thus |
| // the caller is not obliged to ensure that `resources` contains only scalars. |
| void add(const SlaveID& slaveID, const Resources& resources); |
| void subtract(const SlaveID& slaveID, const Resources& resources); |
| |
| bool empty() const { return scalars.empty(); } |
| ResourceQuantities quantities() const { return scalarsTotal; } |
| |
| private: |
| hashmap<SlaveID, Resources> scalars; |
| ResourceQuantities scalarsTotal; |
| }; |
| |
| |
| class Role |
| { |
| public: |
| Role(const std::string& name, Role* parent); |
| |
| const ResourceQuantities& reservationScalarQuantities() const |
| { |
| return reservationScalarQuantities_; |
| } |
| |
| ResourceQuantities offeredOrAllocatedReservedScalarQuantities() const |
| { |
| return offeredOrAllocatedReserved.quantities(); |
| } |
| |
| const hashset<FrameworkID>& frameworks() const { return frameworks_; } |
| |
| const Quota& quota() const { return quota_; } |
| |
| ResourceQuantities quotaOfferedOrConsumed() const |
| { |
| return offeredOrAllocatedUnreservedNonRevocable.quantities() + |
| reservationScalarQuantities_; |
| } |
| |
| ResourceQuantities quotaConsumed() const |
| { |
| return allocatedUnreservedNonRevocable.quantities() + |
| reservationScalarQuantities_; |
| } |
| |
| double weight() const { return weight_; } |
| |
| bool isEmpty() const |
| { |
| return children_.empty() && |
| frameworks_.empty() && |
| reservationScalarQuantities_.empty() && |
| quota_ == DEFAULT_QUOTA && |
| weight_ == DEFAULT_WEIGHT; |
| } |
| |
| std::vector<Role*> children() const { return children_.values(); } |
| |
| const std::string role; // E.g. "a/b/c" |
| const std::string basename; // E.g. "c" |
| |
| private: |
| // We keep fields that are related to the tree structure as private |
| // and only allow mutations through the RoleTree structure. |
| friend class RoleTree; |
| |
| // Add a child to the role, the child must not already exist. |
| void addChild(Role* child); |
| |
| // Remove a child from the role, the child must be present. |
| void removeChild(Role* child); |
| |
| Role* parent; |
| |
| // Configured guaranteed resource quantities and resource limits for |
| // this role. By default, a role has no guarantee and no limit. |
| Quota quota_; |
| |
| // Configured weight for the role. This affects sorting precedence. |
| // By default, weights == DEFAULT_WEIGHT == 1.0. |
| double weight_; |
| |
| // IDs of the frameworks tracked under the role, if any. |
| // A framework is tracked under the role if the framework: |
| // |
| // (1) is subscribed to the role; |
| // *OR* |
| // (2) has resources allocated under the role. |
| // |
| // NOTE: (2) could be true without (1). This is because the allocator |
| // interface allows for a framework role to be removed without recovering |
| // resources offered or allocated to this role. |
| hashset<FrameworkID> frameworks_; |
| |
| // Totals tracker for unreserved non-revocable offered/allocated resources. |
| // Note that since any offered or allocated resources should be tied to |
| // a framework, an empty role (that has no registered framework) must have |
| // this total empty. |
| ScalarResourceTotals offeredOrAllocatedUnreservedNonRevocable; |
| |
| ScalarResourceTotals offeredOrAllocatedReserved; |
| |
| // Aggregated reserved scalar resource quantities on all agents tied to this |
| // role, if any. This includes both its own reservations as well as |
| // reservations of any of its subroles (i.e. it is hierarchical aware). |
| // Note that non-scalar resources, such as ports, are excluded. |
| ResourceQuantities reservationScalarQuantities_; |
| |
| // Totals tracker for unreserved non-revocable resources actually allocated |
| // (i.e. used for launching tasks) to this role and any of its subroles. |
| ScalarResourceTotals allocatedUnreservedNonRevocable; |
| |
| hashmap<std::string, Role*> children_; |
| }; |
| |
| |
| // A tree abstraction for organizing `class Role` hierarchically. |
| // |
| // We track a role when it has: |
| // |
| // * a non-default weight, or |
| // * a non-default quota, or |
| // * frameworks subscribed to it, or |
| // * reservations, or |
| // * descendent roles meeting any of the above conditions. |
| // |
| // Any roles that do not meet these conditions are not tracked in the role tree. |
| class RoleTree |
| { |
| public: |
| RoleTree(); // Only used in tests. |
| |
| RoleTree(Metrics* metrics); |
| |
| ~RoleTree(); |
| |
| Option<const Role*> get(const std::string& role) const; |
| |
| // Return a hashmap of all known roles. Root is not included. |
| const hashmap<std::string, Role>& roles() const { return roles_; } |
| |
| const Role* root() const { return root_; } |
| |
| // We keep track of reservations to enforce role quota limit |
| // in the presence of unallocated reservations. See MESOS-4527. |
| void trackReservations(const Resources& resources); |
| void untrackReservations(const Resources& resources); |
| |
| // We keep track of allocated resources which are actually used by frameworks. |
| void trackAllocated(const SlaveID& slaveId, const Resources& resources); |
| void untrackAllocated(const SlaveID& slaveId, const Resources& resources); |
| |
| void trackFramework( |
| const FrameworkID& frameworkId, const std::string& role); |
| void untrackFramework( |
| const FrameworkID& frameworkId, const std::string& role); |
| |
| void updateQuota(const std::string& role, const Quota& quota); |
| |
| void updateWeight(const std::string& role, double weight); |
| |
| void trackOfferedOrAllocated( |
| const SlaveID& slaveId, |
| const Resources& resources); |
| |
| void untrackOfferedOrAllocated( |
| const SlaveID& slaveId, |
| const Resources& resources); |
| |
| // Dump the role tree state in JSON format for debugging. |
| std::string toJSON() const; |
| |
| private: |
| // Private helper to get non-const pointers. |
| Option<Role*> get_(const std::string& role); |
| |
| // Lookup or add the role struct associated with the role. Ancestor roles |
| // along the tree path will be created if necessary. |
| Role& operator[](const std::string& role); |
| |
| // Helper for modifying a role and all its ancestors. |
| template<class UnaryFunction> |
| static void applyToRoleAndAncestors(Role* role, UnaryFunction f) { |
| for (; role != nullptr; role = role->parent) { |
| f(role); |
| } |
| } |
| |
| // Try to remove the role associated with the given role. |
| // The role must exist. The role and its ancestors will be removed |
| // if they become "empty". See "Role:isEmpty()". |
| // Return true if the role instance associated with the role is removed. |
| // This should be called whenever a role's state (that defines its emptiness) |
| // gets updated, such as quota, weight, reservation and tracked frameworks. |
| // Otherwise the "tracking only non-empty" tree invariant may break. |
| bool tryRemove(const std::string& role); |
| |
| void updateQuotaConsumedMetric(const Role* role); |
| |
| // Root node of the tree, its `basename` == `role` == "". |
| Role* root_; |
| |
| // Allocator's metrics handle for publishing role related metrics. |
| Option<Metrics*> metrics; |
| |
| // A map of role and `Role` pairs for quick lookup. |
| hashmap<std::string, Role> roles_; |
| }; |
| |
| |
| class Slave |
| { |
| public: |
| Slave( |
| const SlaveInfo& _info, |
| const protobuf::slave::Capabilities& _capabilities, |
| bool _activated, |
| const Resources& _total, |
| const hashmap<FrameworkID, Resources>& _allocated) |
| : id(_info.id()), |
| info(_info), |
| capabilities(_capabilities), |
| activated(_activated), |
| totalAllocated(Resources::sum(_allocated)), |
| total(_total), |
| offeredOrAllocated(_allocated), |
| totalOfferedOrAllocated(Resources::sum(_allocated)), |
| shared(_total.shared()), |
| hasGpu_(_total.gpus().getOrElse(0) > 0) |
| { |
| CHECK(_info.has_id()); |
| updateAvailable(); |
| } |
| |
| const Resources& getTotal() const { return total; } |
| |
| const hashmap<FrameworkID, Resources>& getOfferedOrAllocated() const |
| { |
| return offeredOrAllocated; |
| } |
| |
| const Resources& getTotalOfferedOrAllocated() const |
| { |
| return totalOfferedOrAllocated; |
| } |
| |
| const Resources& getAvailable() const { return available; } |
| |
| bool hasGpu() const { return hasGpu_; } |
| |
| void updateTotal(const Resources& newTotal) { |
| total = newTotal; |
| shared = total.shared(); |
| hasGpu_ = total.gpus().getOrElse(0) > 0; |
| |
| updateAvailable(); |
| } |
| |
| void increaseAvailable( |
| const FrameworkID& frameworkId, const Resources& offeredOrAllocated_) |
| { |
| // Increasing available is to subtract offered or allocated. |
| if (offeredOrAllocated_.empty()) { |
| return; |
| } |
| |
| // It is possible that the reference of `offeredOrAllocated_` |
| // points to the same object as `resources` below. We must |
| // do subtraction here before any mutation on the object. |
| totalOfferedOrAllocated -= offeredOrAllocated_; |
| |
| Resources& resources = offeredOrAllocated.at(frameworkId); |
| CHECK_CONTAINS(resources, offeredOrAllocated_); |
| resources -= offeredOrAllocated_; |
| if (resources.empty()) { |
| offeredOrAllocated.erase(frameworkId); |
| } |
| |
| updateAvailable(); |
| } |
| |
| void decreaseAvailable( |
| const FrameworkID& frameworkId, const Resources& offeredOrAllocated_) |
| { |
| if (offeredOrAllocated_.empty()) { |
| return; |
| } |
| |
| // Decreasing available is to add offered or allocated. |
| |
| offeredOrAllocated[frameworkId] += offeredOrAllocated_; |
| |
| totalOfferedOrAllocated += offeredOrAllocated_; |
| |
| updateAvailable(); |
| } |
| |
| const SlaveID id; |
| |
| // The `SlaveInfo` that was passed to the allocator when the slave was added |
| // or updated. Currently only two fields are used: `hostname` for host |
| // whitelisting and in log messages, and `domain` for region-aware |
| // scheduling. |
| SlaveInfo info; |
| |
| protobuf::slave::Capabilities capabilities; |
| |
| bool activated; // Whether to offer resources. |
| |
| // Represents a scheduled unavailability due to maintenance for a specific |
| // slave, and the responses from frameworks as to whether they will be able |
| // to gracefully handle this unavailability. |
| // |
| // NOTE: We currently implement maintenance in the allocator to be able to |
| // leverage state and features such as the FrameworkSorter and OfferFilter. |
| struct Maintenance |
| { |
| Maintenance(const Unavailability& _unavailability) |
| : unavailability(_unavailability) {} |
| |
| // The start time and optional duration of the event. |
| Unavailability unavailability; |
| |
| // A mapping of frameworks to the inverse offer status associated with |
| // this unavailability. |
| // |
| // NOTE: We currently lose this information during a master fail over |
| // since it is not persisted or replicated. This is ok as the new master's |
| // allocator will send out new inverse offers and re-collect the |
| // information. This is similar to all the outstanding offers from an old |
| // master being invalidated, and new offers being sent out. |
| hashmap<FrameworkID, mesos::allocator::InverseOfferStatus> statuses; |
| |
| // Represents the "unit of accounting" for maintenance. When a |
| // `FrameworkID` is present in the hashset it means an inverse offer has |
| // been sent out. When it is not present it means no offer is currently |
| // outstanding. |
| hashset<FrameworkID> offersOutstanding; |
| }; |
| |
| // When the `maintenance` is set the slave is scheduled to be unavailable at |
| // a given point in time, for an optional duration. This information is used |
| // to send out `InverseOffers`. |
| Option<Maintenance> maintenance; |
| |
| // Sum of all allocated (i.e. occupied by running tasks) resources on the |
| // agent. This information is needed to untrack allocated resources when the |
| // agent is removed, because the master is not obligated to separately inform |
| // allocator that resources of the removed agent are not offered/allocated |
| // anymore. |
| Resources totalAllocated; |
| |
| private: |
| void updateAvailable() |
| { |
| // In order to subtract from the total, |
| // we strip the allocation information. |
| Resources totalOfferedOrAllocated_ = totalOfferedOrAllocated; |
| totalOfferedOrAllocated_.unallocate(); |
| |
| // This is hot path. We avoid the unnecessary resource traversals |
| // in the common case where there are no shared resources. |
| if (shared.empty()) { |
| available = total - totalOfferedOrAllocated_; |
| } else { |
| // Since shared resources are offerable even when they are in use, we |
| // always include them as part of available resources. |
| available = |
| (total.nonShared() - totalOfferedOrAllocated_.nonShared()) + shared; |
| } |
| } |
| |
| // Total amount of regular *and* oversubscribed resources. |
| Resources total; |
| |
| // NOTE: We keep track of the slave's allocated resources despite |
| // having that information in sorters. This is because the |
| // information in sorters is not accurate if some framework |
| // hasn't reregistered. See MESOS-2919 for details. |
| // |
| // This includes both regular *and* oversubscribed resources. |
| // |
| // An entry is erased if a framework no longer has any |
| // offered or allocated on the agent. |
| hashmap<FrameworkID, Resources> offeredOrAllocated; |
| |
| // Sum of all offered or allocated resources on the agent. This should equal |
| // to sum of `offeredOrAllocated` (including all the meta-data). |
| Resources totalOfferedOrAllocated; |
| |
| // We track the total and allocated resources on the slave to |
| // avoid calculating it in place every time. |
| // |
| // Note that `available` always contains all the shared resources on the |
| // agent regardless whether they have ever been allocated or not. |
| // NOTE, however, we currently only offer a shared resource only if it has |
| // not been offered in an allocation cycle to a framework. We do this mainly |
| // to preserve the normal offer behavior. This may change in the future |
| // depending on use cases. |
| // |
| // Note that it's possible for the slave to be over-allocated! |
| // In this case, allocated > total. |
| Resources available; |
| |
| // We keep a copy of the shared resources to avoid unnecessary copying. |
| Resources shared; |
| |
| // We cache whether the agent has gpus as an optimization. |
| bool hasGpu_; |
| }; |
| |
| |
| // Implements the basic allocator algorithm - first pick a role by |
| // some criteria, then pick one of their frameworks to allocate to. |
| class HierarchicalAllocatorProcess : public MesosAllocatorProcess |
| { |
| public: |
| HierarchicalAllocatorProcess( |
| const std::function<Sorter*()>& roleSorterFactory, |
| const std::function<Sorter*()>& _frameworkSorterFactory) |
| : initialized(false), |
| paused(true), |
| metrics(*this), |
| completedFrameworkMetrics(0), |
| roleTree(&metrics), |
| roleSorter(roleSorterFactory()), |
| frameworkSorterFactory(_frameworkSorterFactory) {} |
| |
| ~HierarchicalAllocatorProcess() override {} |
| |
| process::PID<HierarchicalAllocatorProcess> self() const |
| { |
| return process::PID<Self>(this); |
| } |
| |
| void initialize( |
| const mesos::allocator::Options& options, |
| const lambda::function< |
| void(const FrameworkID&, |
| const hashmap<std::string, hashmap<SlaveID, Resources>>&)>& |
| offerCallback, |
| const lambda::function< |
| void(const FrameworkID&, |
| const hashmap<SlaveID, UnavailableResources>&)>& |
| inverseOfferCallback) override; |
| |
| void recover( |
| const int _expectedAgentCount, |
| const hashmap<std::string, Quota>& quotas) override; |
| |
| void addFramework( |
| const FrameworkID& frameworkId, |
| const FrameworkInfo& frameworkInfo, |
| const hashmap<SlaveID, Resources>& used, |
| bool active, |
| const std::set<std::string>& suppressedRoles) override; |
| |
| void removeFramework( |
| const FrameworkID& frameworkId) override; |
| |
| void activateFramework( |
| const FrameworkID& frameworkId) override; |
| |
| void deactivateFramework( |
| const FrameworkID& frameworkId) override; |
| |
| void updateFramework( |
| const FrameworkID& frameworkId, |
| const FrameworkInfo& frameworkInfo, |
| const std::set<std::string>& suppressedRoles) override; |
| |
| void addSlave( |
| const SlaveID& slaveId, |
| const SlaveInfo& slaveInfo, |
| const std::vector<SlaveInfo::Capability>& capabilities, |
| const Option<Unavailability>& unavailability, |
| const Resources& total, |
| const hashmap<FrameworkID, Resources>& used) override; |
| |
| void removeSlave( |
| const SlaveID& slaveId) override; |
| |
| void updateSlave( |
| const SlaveID& slave, |
| const SlaveInfo& slaveInfo, |
| const Option<Resources>& total = None(), |
| const Option<std::vector<SlaveInfo::Capability>>& capabilities = None()) |
| override; |
| |
| void addResourceProvider( |
| const SlaveID& slave, |
| const Resources& total, |
| const hashmap<FrameworkID, Resources>& used) override; |
| |
| void deactivateSlave( |
| const SlaveID& slaveId) override; |
| |
| void activateSlave( |
| const SlaveID& slaveId) override; |
| |
| void updateWhitelist( |
| const Option<hashset<std::string>>& whitelist) override; |
| |
| void requestResources( |
| const FrameworkID& frameworkId, |
| const std::vector<Request>& requests) override; |
| |
| void updateAllocation( |
| const FrameworkID& frameworkId, |
| const SlaveID& slaveId, |
| const Resources& offeredResources, |
| const std::vector<ResourceConversion>& conversions) override; |
| |
| process::Future<Nothing> updateAvailable( |
| const SlaveID& slaveId, |
| const std::vector<Offer::Operation>& operations) override; |
| |
| void updateUnavailability( |
| const SlaveID& slaveId, |
| const Option<Unavailability>& unavailability) override; |
| |
| void updateInverseOffer( |
| const SlaveID& slaveId, |
| const FrameworkID& frameworkId, |
| const Option<UnavailableResources>& unavailableResources, |
| const Option<mesos::allocator::InverseOfferStatus>& status, |
| const Option<Filters>& filters) override; |
| |
| process::Future< |
| hashmap<SlaveID, |
| hashmap<FrameworkID, mesos::allocator::InverseOfferStatus>>> |
| getInverseOfferStatuses() override; |
| |
| void transitionOfferedToAllocated( |
| const SlaveID& slaveId, const Resources& resources) override; |
| |
| void recoverResources( |
| const FrameworkID& frameworkId, |
| const SlaveID& slaveId, |
| const Resources& resources, |
| const Option<Filters>& filters, |
| bool isAllocated) override; |
| |
| void suppressOffers( |
| const FrameworkID& frameworkId, |
| const std::set<std::string>& roles) override; |
| |
| void reviveOffers( |
| const FrameworkID& frameworkId, |
| const std::set<std::string>& roles) override; |
| |
| void updateQuota( |
| const std::string& role, |
| const Quota& quota) override; |
| |
| void updateWeights( |
| const std::vector<WeightInfo>& weightInfos) override; |
| |
| void pause() override; |
| |
| void resume() override; |
| |
| protected: |
| // Useful typedefs for dispatch/delay/defer to self()/this. |
| typedef HierarchicalAllocatorProcess Self; |
| typedef HierarchicalAllocatorProcess This; |
| |
| // Generate offers from all known agents. |
| process::Future<Nothing> generateOffers(); |
| |
| // Generate offers from the specified agent. |
| process::Future<Nothing> generateOffers(const SlaveID& slaveId); |
| |
| // Generate offers from the specified agents. The offer generation is |
| // deferred and batched with other offer generation requests. |
| process::Future<Nothing> generateOffers(const hashset<SlaveID>& slaveIds); |
| |
| Nothing _generateOffers(); |
| |
| void __generateOffers(); |
| |
| void generateInverseOffers(); |
| |
| // Remove an offer filter for the specified role of the framework. |
| void expire( |
| const FrameworkID& frameworkId, |
| const std::string& role, |
| const SlaveID& slaveId, |
| const std::weak_ptr<OfferFilter>& offerFilter); |
| |
| void _expire( |
| const FrameworkID& frameworkId, |
| const std::string& role, |
| const SlaveID& slaveId, |
| const std::weak_ptr<OfferFilter>& offerFilter); |
| |
| // Remove an inverse offer filter for the specified framework. |
| void expire( |
| const FrameworkID& frameworkId, |
| const SlaveID& slaveId, |
| const std::weak_ptr<InverseOfferFilter>& inverseOfferFilter); |
| |
| // Checks whether the slave is whitelisted. |
| bool isWhitelisted(const SlaveID& slaveId) const; |
| |
| // Returns true if there is a resource offer filter for the |
| // specified role of this framework on this slave. |
| bool isFiltered( |
| const Framework& framework, |
| const std::string& role, |
| const Slave& slave, |
| const Resources& resources) const; |
| |
| // Returns true if there is an inverse offer filter for this framework |
| // on this slave. |
| bool isFiltered( |
| const Framework& framework, |
| const Slave& slave) const; |
| |
| bool allocatable( |
| const Resources& resources, |
| const std::string& role, |
| const Framework& framework) const; |
| |
| bool initialized; |
| bool paused; |
| |
| mesos::allocator::Options options; |
| |
| // Recovery data. |
| Option<int> expectedAgentCount; |
| |
| lambda::function< |
| void(const FrameworkID&, |
| const hashmap<std::string, hashmap<SlaveID, Resources>>&)> |
| offerCallback; |
| |
| lambda::function< |
| void(const FrameworkID&, |
| const hashmap<SlaveID, UnavailableResources>&)> |
| inverseOfferCallback; |
| |
| friend Metrics; |
| Metrics metrics; |
| |
| double _event_queue_dispatches() |
| { |
| return static_cast<double>(eventCount<process::DispatchEvent>()); |
| } |
| |
| double _resources_total( |
| const std::string& resource); |
| |
| double _resources_offered_or_allocated( |
| const std::string& resource); |
| |
| double _quota_offered_or_allocated( |
| const std::string& role, |
| const std::string& resource); |
| |
| double _offer_filters_active( |
| const std::string& role); |
| |
| hashmap<FrameworkID, Framework> frameworks; |
| |
| BoundedHashMap<FrameworkID, process::Owned<FrameworkMetrics>> |
| completedFrameworkMetrics; |
| |
| hashmap<SlaveID, Slave> slaves; |
| |
| // Total scalar resource quantities on all agents. |
| ResourceQuantities totalScalarQuantities; |
| |
| RoleTree roleTree; |
| |
| // A set of agents that are kept as allocation candidates. Events |
| // may add or remove candidates to the set. When an offer generation is |
| // processed, the set of candidates is cleared. |
| hashset<SlaveID> allocationCandidates; |
| |
| // Future for the dispatched offer generation that becomes |
| // ready after the offer generation run is complete. |
| Option<process::Future<Nothing>> offerGeneration; |
| |
| // Slaves to send offers for. |
| Option<hashset<std::string>> whitelist; |
| |
| // There are two stages of offer generation: |
| // |
| // Stage 1: Generate offers to satisfy quota guarantees. |
| // |
| // Stage 2: Generate offers above quota guarantees up to quota limits. |
| // Note that we need to hold back enough "headroom" |
| // to ensure that any unsatisfied quota can be |
| // satisfied later. |
| // |
| // Each stage comprises two levels of sorting, hence "hierarchical". |
| // Level 1 sorts across roles: |
| // Currently, only the offered or allocated portion of the reserved |
| // resources are accounted for fairness calculation. |
| // |
| // TODO(mpark): Reserved resources should be accounted for fairness |
| // calculation whether they are offered/allocated or not, since they model |
| // a long or forever running task. That is, the effect of reserving resources |
| // is equivalent to launching a task in that the resources that make up the |
| // reservation are not available to other roles as non-revocable. |
| // |
| // Level 2 sorts across frameworks within a particular role: |
| // Reserved resources at this level are, and should be accounted for |
| // fairness calculation only if they are allocated. This is because |
| // reserved resources are fairly shared across the frameworks in the role. |
| // |
| // The allocator relies on `Sorter`s to employ a particular sorting |
| // algorithm. Each level has its own sorter and hence may have different |
| // fairness calculations. |
| // |
| // NOTE: The hierarchical allocator considers revocable resources as |
| // regular resources when doing fairness calculations. |
| // |
| // TODO(vinod): Consider using a different fairness algorithm for |
| // revocable resources. |
| |
| // A sorter for active roles. This sorter determines the order in which |
| // roles are offered resources during Level 1 of the second stage. |
| // The total cluster resources are used as the resource pool. |
| process::Owned<Sorter> roleSorter; |
| |
| // A collection of sorters, one per active role. Each sorter determines |
| // the order in which frameworks that belong to the same role are offered |
| // resources inside the role's share. These sorters are used during Level 2 |
| // for both the first and the second stages. Since frameworks are sharing |
| // resources of a role, resources offered or allocated to the role are used as |
| // the resource pool for each role specific framework sorter. |
| hashmap<std::string, process::Owned<Sorter>> frameworkSorters; |
| |
| // Factory function for framework sorters. |
| const std::function<Sorter*()> frameworkSorterFactory; |
| |
| private: |
| bool isFrameworkTrackedUnderRole( |
| const FrameworkID& frameworkId, |
| const std::string& role) const; |
| |
| Option<Slave*> getSlave(const SlaveID& slaveId) const; |
| Option<Framework*> getFramework(const FrameworkID& frameworkId) const; |
| |
| Option<Sorter*> getFrameworkSorter(const std::string& role) const; |
| |
| const Quota& getQuota(const std::string& role) const; |
| |
| // Helpers to track and untrack a framework under a role. |
| // Frameworks should be tracked under a role either if it subscribes to the |
| // role *OR* it has resources allocated/offered to that role. when neither |
| // conditions are met, it should be untracked. |
| // |
| // `tryUntrackFrameworkUnderRole` returns true if the framework is untracked |
| // under the role. |
| void trackFrameworkUnderRole( |
| const Framework& framework, const std::string& role); |
| bool tryUntrackFrameworkUnderRole( |
| const Framework& framework, const std::string& role); |
| |
| void suppressRoles(Framework& framework, const std::set<std::string>& roles); |
| void reviveRoles(Framework& framework, const std::set<std::string>& roles); |
| |
| // Helper to update the agent's total resources maintained in the allocator |
| // and the role and quota sorters (whose total resources match the agent's |
| // total resources). Returns true iff the stored agent total was changed. |
| bool updateSlaveTotal(const SlaveID& slaveId, const Resources& total); |
| |
| // Helper that returns true if the given agent is located in a |
| // different region than the master. This can only be the case if |
| // the agent and the master are both configured with a fault domain. |
| bool isRemoteSlave(const Slave& slave) const; |
| |
| // Helper function that checks if a framework is capable of |
| // receiving resources on the agent based on the framework capability. |
| // |
| // TODO(mzhu): Make this a `Framework` member function once we pull |
| // `struct Framework` out from being nested. |
| bool isCapableOfReceivingAgent( |
| const protobuf::framework::Capabilities& frameworkCapabilities, |
| const Slave& slave) const; |
| |
| // Helper function that removes any resources that the framework is not |
| // capable of receiving based on the given framework capability. |
| // |
| // TODO(mzhu): Make this a `Framework` member function once we pull |
| // `struct Framework` out from being nested. |
| Resources stripIncapableResources( |
| const Resources& resources, |
| const protobuf::framework::Capabilities& frameworkCapabilities) const; |
| |
| // Helper to track offered or allocated resources on an agent. |
| // |
| // TODO(asekretenko): rename `(un)trackAllocatedResources()` to reflect the |
| // fact that these methods do not distinguish between offered and allocated. |
| // |
| // TODO(mzhu): replace this with `RoleTree::trackOfferedOrAllocated`. |
| void trackAllocatedResources( |
| const SlaveID& slaveId, |
| const FrameworkID& frameworkId, |
| const Resources& offeredOrAllocated); |
| |
| // Helper to untrack resources that are no longer offered or allocated |
| // on an agent. |
| // |
| // TODO(mzhu): replace this with `RoleTree::untrackOfferedOrAllocated`. |
| void untrackAllocatedResources( |
| const SlaveID& slaveId, |
| const FrameworkID& frameworkId, |
| const Resources& offeredOrallocated); |
| |
| // Helper that removes all existing offer filters for the given slave |
| // id. |
| void removeFilters(const SlaveID& slaveId); |
| }; |
| |
| |
| } // namespace internal { |
| |
| |
| // We map the templatized version of the `HierarchicalAllocatorProcess` to one |
| // that relies on sorter factories in the internal namespace. This allows us |
| // to keep the implementation of the allocator in the implementation file. |
| template < |
| typename RoleSorter, |
| typename FrameworkSorter> |
| class HierarchicalAllocatorProcess |
| : public internal::HierarchicalAllocatorProcess |
| { |
| public: |
| HierarchicalAllocatorProcess() |
| : ProcessBase(process::ID::generate("hierarchical-allocator")), |
| internal::HierarchicalAllocatorProcess( |
| [this]() -> Sorter* { |
| return new RoleSorter(this->self(), "allocator/mesos/roles/"); |
| }, |
| []() -> Sorter* { return new FrameworkSorter(); }) {} |
| }; |
| |
| } // namespace allocator { |
| } // namespace master { |
| } // namespace internal { |
| } // namespace mesos { |
| |
| #endif // __MASTER_ALLOCATOR_MESOS_HIERARCHICAL_HPP__ |