| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include "master/allocator/mesos/hierarchical.hpp" |
| |
| #include <algorithm> |
| #include <set> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include <mesos/attributes.hpp> |
| #include <mesos/resource_quantities.hpp> |
| #include <mesos/resources.hpp> |
| #include <mesos/roles.hpp> |
| #include <mesos/type_utils.hpp> |
| |
| #include <process/after.hpp> |
| #include <process/delay.hpp> |
| #include <process/dispatch.hpp> |
| #include <process/event.hpp> |
| #include <process/id.hpp> |
| #include <process/loop.hpp> |
| #include <process/timeout.hpp> |
| |
| #include <stout/check.hpp> |
| #include <stout/hashset.hpp> |
| #include <stout/set.hpp> |
| #include <stout/stopwatch.hpp> |
| #include <stout/stringify.hpp> |
| |
| #include "common/http.hpp" |
| #include "common/protobuf_utils.hpp" |
| #include "common/resources_utils.hpp" |
| |
| using std::make_shared; |
| using std::set; |
| using std::shared_ptr; |
| using std::string; |
| using std::vector; |
| using std::weak_ptr; |
| |
| using mesos::allocator::InverseOfferStatus; |
| using mesos::allocator::Options; |
| |
| using process::after; |
| using process::Continue; |
| using process::ControlFlow; |
| using process::Failure; |
| using process::Future; |
| using process::loop; |
| using process::Owned; |
| using process::PID; |
| using process::Timeout; |
| |
| |
| namespace mesos { |
| |
| // Needed to prevent shadowing of template '::operator-<std::set<T>>' |
| // by non-template '::mesos::operator-' |
| using ::operator-; |
| |
| namespace internal { |
| namespace master { |
| namespace allocator { |
| namespace internal { |
| |
| // Used to represent "filters" for resources unused in offers. |
| class OfferFilter |
| { |
| public: |
| virtual ~OfferFilter() {} |
| |
| virtual bool filter(const Resources& resources) const = 0; |
| }; |
| |
| |
| class RefusedOfferFilter : public OfferFilter |
| { |
| public: |
| RefusedOfferFilter( |
| const Resources& _resources, |
| const Duration& timeout) |
| : _resources(_resources), |
| _expired(after(timeout)) {} |
| |
| ~RefusedOfferFilter() override |
| { |
| // Cancel the timeout upon destruction to avoid lingering timers. |
| _expired.discard(); |
| } |
| |
| Future<Nothing> expired() const { return _expired; }; |
| |
| bool filter(const Resources& resources) const override |
| { |
| // NOTE: We do not check for the filter being expired here |
| // because `recoverResources()` expects the filter to apply |
| // until the filter is removed, see: |
| // https://github.com/apache/mesos/commit/2f170f302fe94c4 |
| // |
| // TODO(jieyu): Consider separating the superset check for regular |
| // and revocable resources. For example, frameworks might want |
| // more revocable resources only or non-revocable resources only, |
| // but currently the filter only expires if there is more of both |
| // revocable and non-revocable resources. |
| return _resources.contains(resources); // Refused resources are superset. |
| } |
| |
| private: |
| const Resources _resources; |
| Future<Nothing> _expired; |
| }; |
| |
| |
| // Used to represent "filters" for inverse offers. |
| // |
| // NOTE: Since this specific allocator implementation only sends inverse offers |
| // for maintenance primitives, and those are at the whole slave level, we only |
| // need to filter based on the time-out. |
| // If this allocator implementation starts sending out more resource specific |
| // inverse offers, then we can capture the `unavailableResources` in the filter |
| // function. |
| class InverseOfferFilter |
| { |
| public: |
| virtual ~InverseOfferFilter() {} |
| |
| virtual bool filter() const = 0; |
| }; |
| |
| |
| // NOTE: See comment above `InverseOfferFilter` regarding capturing |
| // `unavailableResources` if this allocator starts sending fine-grained inverse |
| // offers. |
| class RefusedInverseOfferFilter : public InverseOfferFilter |
| { |
| public: |
| RefusedInverseOfferFilter(const Duration& timeout) |
| : _expired(after(timeout)) {} |
| |
| ~RefusedInverseOfferFilter() override |
| { |
| // Cancel the timeout upon destruction to avoid lingering timers. |
| _expired.discard(); |
| } |
| |
| Future<Nothing> expired() const { return _expired; }; |
| |
| bool filter() const override |
| { |
| // See comment above why we currently don't do more fine-grained filtering. |
| return _expired.isPending(); |
| } |
| |
| private: |
| Future<Nothing> _expired; |
| }; |
| |
| |
| // Helper function to unpack a map of per-role `OfferFilters` to the |
| // format used by the allocator. |
| static hashmap<string, vector<ResourceQuantities>> unpackFrameworkOfferFilters( |
| const ::google::protobuf::Map<string, OfferFilters>& roleOfferFilters) |
| { |
| hashmap<string, vector<ResourceQuantities>> result; |
| |
| // Use `auto` in place of `protobuf::MapPair<string, AllocatableResources>` |
| // below since `foreach` is a macro and cannot contain angle brackets. |
| foreach (auto&& offerFilters, roleOfferFilters) { |
| const string& role = offerFilters.first; |
| const OfferFilters& allocatableResources = offerFilters.second; |
| |
| if (allocatableResources.has_min_allocatable_resources()) { |
| result.insert({role, {}}); |
| |
| vector<ResourceQuantities>& allocatableResourcesRole = result[role]; |
| |
| foreach ( |
| const OfferFilters::ResourceQuantities& quantities, |
| allocatableResources.min_allocatable_resources().quantities()) { |
| allocatableResourcesRole.push_back( |
| ResourceQuantities(quantities.quantities())); |
| } |
| } |
| } |
| |
| return result; |
| } |
| |
| |
| void ScalarResourceTotals::add( |
| const SlaveID& slaveID, |
| const Resources& resources) |
| { |
| if (resources.scalars().empty()) { |
| // In this case, we avoid adding an entry to `scalars` to maintain the |
| // invariant that `scalars` doesn't track agents with empty resources. |
| return; |
| } |
| |
| scalarsTotal -= ResourceQuantities::fromScalarResources(scalars[slaveID]); |
| scalars.at(slaveID) += resources.scalars(); |
| scalarsTotal += ResourceQuantities::fromScalarResources(scalars.at(slaveID)); |
| } |
| |
| |
| void ScalarResourceTotals::subtract( |
| const SlaveID& slaveID, |
| const Resources& resources) |
| { |
| if (resources.scalars().empty()) { |
| // `scalars` does not track agents with empty resources, thus subtracting |
| // empty resources from an agent is valid regardless of whether its |
| // resources are tracked in `scalars`. |
| return; |
| } |
| |
| CHECK_CONTAINS(scalars, slaveID); |
| CHECK_CONTAINS(scalars.at(slaveID), resources.scalars()); |
| |
| scalarsTotal -= ResourceQuantities::fromScalarResources(scalars.at(slaveID)); |
| scalars.at(slaveID) -= resources.scalars(); |
| scalarsTotal += ResourceQuantities::fromScalarResources(scalars.at(slaveID)); |
| |
| if (scalars.at(slaveID).empty()) { |
| scalars.erase(slaveID); |
| } |
| } |
| |
| |
| Role::Role(const string& _role, Role* _parent) |
| : role(_role), |
| basename(strings::split(role, "/").back()), |
| parent(_parent), |
| quota_(DEFAULT_QUOTA), |
| weight_(DEFAULT_WEIGHT) {} |
| |
| |
| void Role::addChild(Role* child) |
| { |
| CHECK_NOT_CONTAINS(children_, child->basename); |
| children_.put(child->basename, child); |
| } |
| |
| |
| void Role::removeChild(Role* child) |
| { |
| CHECK_CONTAINS(children_, child->basename); |
| children_.erase(child->basename); |
| } |
| |
| |
| RoleTree::RoleTree() : root_(new Role("", nullptr)) {} |
| |
| |
| RoleTree::RoleTree(Metrics* metrics_) |
| : root_(new Role("", nullptr)), metrics(metrics_) {} |
| |
| |
| RoleTree::~RoleTree() |
| { |
| delete root_; |
| } |
| |
| |
| Option<const Role*> RoleTree::get(const std::string& role) const |
| { |
| auto found = roles_.find(role); |
| |
| if (found == roles_.end()) { |
| return None(); |
| } else { |
| return &(found->second); |
| } |
| } |
| |
| |
| Option<Role*> RoleTree::get_(const std::string& role) |
| { |
| auto found = roles_.find(role); |
| |
| if (found == roles_.end()) { |
| return None(); |
| } else { |
| return &(found->second); |
| } |
| } |
| |
| |
| Role& RoleTree::operator[](const std::string& rolePath) |
| { |
| if (roles_.contains(rolePath)) { |
| return roles_.at(rolePath); |
| } |
| |
| // We go through the path from top to bottom and create any missing |
| // node along the way. |
| Role* current = root_; |
| foreach (const string& token, strings::split(rolePath, "/")) { |
| Option<Role*> child = current->children_.get(token); |
| |
| if (child.isSome()) { |
| current = *child; |
| continue; |
| } |
| |
| // Create a new role. |
| string newRolePath = |
| current == root_ ? token : strings::join("/", current->role, token); |
| CHECK_NOT_CONTAINS(roles_, newRolePath); |
| roles_.put(newRolePath, Role(newRolePath, current)); |
| |
| if (metrics.isSome()) { |
| (*metrics)->addRole(newRolePath); |
| } |
| |
| Role& role = roles_.at(newRolePath); |
| current->addChild(&role); |
| current = &role; |
| } |
| |
| return roles_.at(rolePath); |
| } |
| |
| |
| bool RoleTree::tryRemove(const std::string& role) |
| { |
| CHECK_CONTAINS(roles_, role); |
| Role* current = &(roles_.at(role)); |
| |
| if (!current->isEmpty()) { |
| return false; |
| } |
| |
| // We go through the path from bottom to top and remove empty nodes |
| // along the way. |
| vector<string> tokens = strings::split(role, "/"); |
| for (auto token = tokens.crbegin(); token != tokens.crend(); ++token) { |
| CHECK_EQ(current->basename, *token); |
| if (!current->isEmpty()) { |
| break; |
| } |
| |
| CHECK(current->allocatedUnreservedNonRevocable.empty()) |
| << "An empty role " << current->role |
| << " has non-empty allocated scalar resources: " |
| << current->allocatedUnreservedNonRevocable.quantities(); |
| |
| Role* parent = CHECK_NOTNULL(current->parent); |
| |
| parent->removeChild(current); |
| |
| if (metrics.isSome()) { |
| (*metrics)->removeRole(current->role); |
| } |
| |
| CHECK(current->offeredOrAllocatedUnreservedNonRevocable.empty()) |
| << "An empty role " << current->role |
| << " has non-empty offered or allocated" |
| << " unreserved non-revocable scalar resources: " |
| << current->offeredOrAllocatedUnreservedNonRevocable.quantities(); |
| |
| CHECK(current->offeredOrAllocatedReserved.empty()) |
| << "An empty role " << current->role |
| << " has non-empty offered or allocated reserved scalar resources: " |
| << current->offeredOrAllocatedReserved.quantities(); |
| |
| roles_.erase(current->role); |
| |
| current = parent; |
| } |
| |
| return true; |
| } |
| |
| |
| void RoleTree::updateQuotaConsumedMetric(const Role* role) |
| { |
| if (metrics.isSome()) { |
| (*metrics)->updateConsumed(role->role, role->quotaConsumed()); |
| } |
| } |
| |
| |
| void RoleTree::trackReservations(const Resources& resources) |
| { |
| foreach (const Resource& r, resources.scalars()) { |
| CHECK(Resources::isReserved(r)); |
| |
| const string& reservationRole = Resources::reservationRole(r); |
| ResourceQuantities quantities = ResourceQuantities::fromScalarResources(r); |
| |
| // NOTE: If necessary, a new role tree node is created. |
| applyToRoleAndAncestors(&(*this)[reservationRole], [&](Role* current) { |
| current->reservationScalarQuantities_ += quantities; |
| updateQuotaConsumedMetric(current); |
| }); |
| } |
| } |
| |
| |
| void RoleTree::untrackReservations(const Resources& resources) |
| { |
| foreach (const Resource& r, resources.scalars()) { |
| CHECK(Resources::isReserved(r)); |
| |
| const string& reservationRole = Resources::reservationRole(r); |
| ResourceQuantities quantities = ResourceQuantities::fromScalarResources(r); |
| |
| applyToRoleAndAncestors( |
| CHECK_NOTNONE(get_(reservationRole)), [&](Role* current) { |
| CHECK_CONTAINS(current->reservationScalarQuantities_, quantities); |
| current->reservationScalarQuantities_ -= quantities; |
| updateQuotaConsumedMetric(current); |
| }); |
| |
| tryRemove(reservationRole); |
| } |
| } |
| |
| |
| void RoleTree::trackAllocated( |
| const SlaveID& slaveId, |
| const Resources& resources_) |
| { |
| foreachpair ( |
| const string& role, |
| const Resources& resources, |
| resources_.scalars().unreserved().nonRevocable().allocations()) { |
| applyToRoleAndAncestors(CHECK_NOTNONE(get_(role)), [&](Role* current) { |
| current->allocatedUnreservedNonRevocable.add(slaveId, resources); |
| updateQuotaConsumedMetric(current); |
| }); |
| } |
| } |
| |
| |
| void RoleTree::untrackAllocated( |
| const SlaveID& slaveId, |
| const Resources& resources_) |
| { |
| foreachpair ( |
| const string& role, |
| const Resources& resources, |
| resources_.scalars().unreserved().nonRevocable().allocations()) { |
| applyToRoleAndAncestors(CHECK_NOTNONE(get_(role)), [&](Role* current) { |
| current->allocatedUnreservedNonRevocable.subtract(slaveId, resources); |
| updateQuotaConsumedMetric(current); |
| }); |
| } |
| } |
| |
| |
| void RoleTree::trackFramework( |
| const FrameworkID& frameworkId, const string& rolePath) |
| { |
| Role* role = &(*this)[rolePath]; |
| |
| CHECK_NOT_CONTAINS(role->frameworks_, frameworkId) |
| << " for role " << rolePath; |
| role->frameworks_.insert(frameworkId); |
| } |
| |
| |
| void RoleTree::untrackFramework( |
| const FrameworkID& frameworkId, const string& rolePath) |
| { |
| CHECK_CONTAINS(roles_, rolePath); |
| Role& role = roles_.at(rolePath); |
| |
| CHECK_CONTAINS(role.frameworks_, frameworkId) << " for role " << rolePath; |
| role.frameworks_.erase(frameworkId); |
| |
| tryRemove(rolePath); |
| } |
| |
| |
| void RoleTree::updateQuota(const string& role, const Quota& quota) |
| { |
| (*this)[role].quota_ = quota; |
| |
| tryRemove(role); |
| } |
| |
| |
| void RoleTree::updateWeight(const string& role, double weight) |
| { |
| (*this)[role].weight_ = weight; |
| |
| tryRemove(role); |
| } |
| |
| |
| void RoleTree::trackOfferedOrAllocated( |
| const SlaveID& slaveId, |
| const Resources& resources_) |
| { |
| // TODO(mzhu): avoid building a map by traversing `resources` |
| // and look for the allocation role of individual resource. |
| // However, due to MESOS-9242, this currently does not work |
| // as traversing resources would lose the shared count. |
| foreachpair ( |
| const string& role, |
| const Resources& resources, |
| resources_.scalars().allocations()) { |
| applyToRoleAndAncestors( |
| CHECK_NOTNONE(get_(role)), [&resources, &slaveId](Role* current) { |
| current->offeredOrAllocatedReserved.add( |
| slaveId, resources.reserved()); |
| |
| current->offeredOrAllocatedUnreservedNonRevocable.add( |
| slaveId, resources.unreserved().nonRevocable()); |
| }); |
| } |
| } |
| |
| |
| void RoleTree::untrackOfferedOrAllocated( |
| const SlaveID& slaveId, |
| const Resources& resources_) |
| { |
| // TODO(mzhu): avoid building a map by traversing `resources` |
| // and look for the allocation role of individual resource. |
| // However, due to MESOS-9242, this currently does not work |
| // as traversing resources would lose the shared count. |
| foreachpair ( |
| const string& role, |
| const Resources& resources, |
| resources_.scalars().allocations()) { |
| applyToRoleAndAncestors( |
| CHECK_NOTNONE(get_(role)), [&resources, &slaveId](Role* current) { |
| current->offeredOrAllocatedReserved.subtract( |
| slaveId, resources.reserved()); |
| |
| current->offeredOrAllocatedUnreservedNonRevocable.subtract( |
| slaveId, resources.unreserved().nonRevocable()); |
| }); |
| } |
| } |
| |
| |
| std::string RoleTree::toJSON() const |
| { |
| std::function<void(JSON::ObjectWriter*, const Role*)> json = |
| [&](JSON::ObjectWriter* writer, const Role* role) { |
| writer->field("basename", role->basename); |
| writer->field("role", role->role); |
| writer->field("weight", role->weight_); |
| writer->field("guarantees", role->quota_.guarantees); |
| writer->field("limits", role->quota_.limits); |
| writer->field( |
| "reservation_quantities", role->reservationScalarQuantities_); |
| |
| writer->field( |
| "offered_or_allocated_reserved_quantities", |
| role->offeredOrAllocatedReserved.quantities()); |
| |
| writer->field( |
| "offered_or_allocated_unreserved_nonrevocable_quantities", |
| role->offeredOrAllocatedUnreservedNonRevocable.quantities()); |
| |
| writer->field("frameworks", [&](JSON::ArrayWriter* writer) { |
| foreach (const FrameworkID& id, role->frameworks_) { |
| writer->element(id.value()); |
| } |
| }); |
| |
| writer->field("children", [&](JSON::ArrayWriter* writer) { |
| foreachvalue (const Role* child, role->children_) { |
| writer->element( |
| [&](JSON::ObjectWriter* writer) { json(writer, child); }); |
| } |
| }); |
| }; |
| |
| auto tree = [&](JSON::ObjectWriter* writer) { json(writer, root_); }; |
| |
| return jsonify(tree); |
| } |
| |
| |
| Framework::Framework( |
| const FrameworkInfo& frameworkInfo, |
| const set<string>& _suppressedRoles, |
| bool _active, |
| bool publishPerFrameworkMetrics) |
| : frameworkId(frameworkInfo.id()), |
| roles(protobuf::framework::getRoles(frameworkInfo)), |
| suppressedRoles(_suppressedRoles), |
| capabilities(frameworkInfo.capabilities()), |
| active(_active), |
| metrics(new FrameworkMetrics(frameworkInfo, publishPerFrameworkMetrics)), |
| minAllocatableResources( |
| unpackFrameworkOfferFilters(frameworkInfo.offer_filters())) {} |
| |
| |
| void HierarchicalAllocatorProcess::initialize( |
| const Options& _options, |
| const lambda::function< |
| void(const FrameworkID&, |
| const hashmap<string, hashmap<SlaveID, Resources>>&)>& |
| _offerCallback, |
| const lambda::function< |
| void(const FrameworkID&, |
| const hashmap<SlaveID, UnavailableResources>&)>& |
| _inverseOfferCallback) |
| { |
| options = _options; |
| offerCallback = _offerCallback; |
| inverseOfferCallback = _inverseOfferCallback; |
| initialized = true; |
| paused = false; |
| |
| completedFrameworkMetrics = |
| BoundedHashMap<FrameworkID, process::Owned<FrameworkMetrics>>( |
| options.maxCompletedFrameworks); |
| |
| roleSorter->initialize(options.fairnessExcludeResourceNames); |
| |
| VLOG(1) << "Initialized hierarchical allocator process"; |
| |
| // Start a loop to run allocation periodically. |
| PID<HierarchicalAllocatorProcess> _self = self(); |
| |
| // Set a temporary variable for the lambda capture. |
| Duration allocationInterval = options.allocationInterval; |
| loop( |
| None(), // Use `None` so we iterate outside the allocator process. |
| [allocationInterval]() { |
| return after(allocationInterval); |
| }, |
| [_self](const Nothing&) { |
| return dispatch(_self, &HierarchicalAllocatorProcess::generateOffers) |
| .then([]() -> ControlFlow<Nothing> { return Continue(); }); |
| }); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::recover( |
| const int _expectedAgentCount, |
| const hashmap<string, Quota>& quotas) |
| { |
| // Recovery should start before actual allocation starts. |
| CHECK(initialized); |
| CHECK(slaves.empty()); |
| CHECK(_expectedAgentCount >= 0); |
| |
| // If there is no quota, recovery is a no-op. Otherwise, we need |
| // to delay allocations while agents are reregistering because |
| // otherwise we perform allocations on a partial view of resources! |
| // We would consequently perform unnecessary allocations to satisfy |
| // quota constraints, which can over-allocate non-revocable resources |
| // to roles using quota. Then, frameworks in roles without quota can |
| // be unnecessarily deprived of resources. We may also be unable to |
| // satisfy all of the quota constraints. Repeated master failovers |
| // exacerbate the issue. |
| |
| if (quotas.empty()) { |
| VLOG(1) << "Skipping recovery of hierarchical allocator:" |
| << " nothing to recover"; |
| |
| return; |
| } |
| |
| foreachpair (const string& role, const Quota& quota, quotas) { |
| updateQuota(role, quota); |
| } |
| |
| // TODO(alexr): Consider exposing these constants. |
| const Duration ALLOCATION_HOLD_OFF_RECOVERY_TIMEOUT = Minutes(10); |
| const double AGENT_RECOVERY_FACTOR = 0.8; |
| |
| // Record the number of expected agents. |
| expectedAgentCount = |
| static_cast<int>(_expectedAgentCount * AGENT_RECOVERY_FACTOR); |
| |
| // Skip recovery if there are no expected agents. This is not strictly |
| // necessary for the allocator to function correctly, but maps better |
| // to expected behavior by the user: the allocator is not paused until |
| // a new agent is added. |
| if (expectedAgentCount.get() == 0) { |
| VLOG(1) << "Skipping recovery of hierarchical allocator:" |
| << " no reconnecting agents to wait for"; |
| |
| return; |
| } |
| |
| // Pause allocation until after a sufficient amount of agents reregister |
| // or a timer expires. |
| pause(); |
| |
| // Setup recovery timer. |
| delay(ALLOCATION_HOLD_OFF_RECOVERY_TIMEOUT, self(), &Self::resume); |
| |
| LOG(INFO) << "Triggered allocator recovery: waiting for " |
| << expectedAgentCount.get() << " agents to reconnect or " |
| << ALLOCATION_HOLD_OFF_RECOVERY_TIMEOUT << " to pass"; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::addFramework( |
| const FrameworkID& frameworkId, |
| const FrameworkInfo& frameworkInfo, |
| const hashmap<SlaveID, Resources>& used, |
| bool active, |
| const set<string>& suppressedRoles) |
| { |
| CHECK(initialized); |
| CHECK_NOT_CONTAINS(frameworks, frameworkId); |
| |
| // TODO(mzhu): remove the `frameworkId` parameter. |
| CHECK_EQ(frameworkId, frameworkInfo.id()); |
| |
| frameworks.insert({frameworkId, |
| Framework( |
| frameworkInfo, |
| suppressedRoles, |
| active, |
| options.publishPerFrameworkMetrics)}); |
| |
| const Framework& framework = *CHECK_NOTNONE(getFramework(frameworkId)); |
| |
| foreach (const string& role, framework.roles) { |
| trackFrameworkUnderRole(framework, role); |
| |
| Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); |
| |
| if (suppressedRoles.count(role)) { |
| frameworkSorter->deactivate(frameworkId.value()); |
| framework.metrics->suppressRole(role); |
| } else { |
| frameworkSorter->activate(frameworkId.value()); |
| framework.metrics->reviveRole(role); |
| } |
| } |
| |
| // Update the allocation for this framework. |
| foreachpair (const SlaveID& slaveId, const Resources& resources, used) { |
| // TODO(bmahler): The master won't tell us about resources |
| // allocated to agents that have not yet been added, consider |
| // CHECKing this case. |
| if (!slaves.contains(slaveId)) { |
| continue; |
| } |
| |
| // The slave struct will already be aware of the allocated |
| // resources, so we only need to track them in the sorters. |
| trackAllocatedResources(slaveId, frameworkId, resources); |
| |
| roleTree.trackAllocated(slaveId, resources); |
| } |
| |
| LOG(INFO) << "Added framework " << frameworkId; |
| |
| if (active) { |
| generateOffers(); |
| } else { |
| deactivateFramework(frameworkId); |
| } |
| } |
| |
| |
| void HierarchicalAllocatorProcess::removeFramework( |
| const FrameworkID& frameworkId) |
| { |
| CHECK(initialized); |
| |
| // To free up offered or allocated resources of a framework, we need to |
| // do two things: update available resources in the agent and update |
| // tracking info in the role tree and role sorter. |
| // We do both at the same time. |
| foreachvalue (Slave& slave, slaves) { |
| const hashmap<FrameworkID, Resources>& offeredOrAllocated = |
| slave.getOfferedOrAllocated(); |
| auto frameworkResources = offeredOrAllocated.find(frameworkId); |
| |
| if (frameworkResources == offeredOrAllocated.end()) { |
| continue; |
| } |
| |
| VLOG(1) << "Recovering " << frameworkResources->second |
| << " from removing framework " << frameworkId |
| << " (agent total: " << slave.getTotal() << "," |
| << " offered or allocated: " |
| << slave.getTotalOfferedOrAllocated() << ")"; |
| |
| untrackAllocatedResources( |
| slave.id, frameworkId, frameworkResources->second); |
| |
| // Note: this method might mutate `offeredOrAllocated`. |
| slave.increaseAvailable(frameworkId, frameworkResources->second); |
| } |
| |
| Framework& framework = *CHECK_NOTNONE(getFramework(frameworkId)); |
| |
| // Untack framework from roles. |
| foreach (const string& role, framework.roles) { |
| CHECK(tryUntrackFrameworkUnderRole(framework, role)) |
| << " Framework: " << frameworkId << " role: " << role; |
| } |
| |
| // Transfer ownership of this framework's metrics to |
| // `completedFrameworkMetrics`. |
| completedFrameworkMetrics.set( |
| frameworkId, |
| Owned<FrameworkMetrics>(framework.metrics.release())); |
| |
| frameworks.erase(frameworkId); |
| |
| LOG(INFO) << "Removed framework " << frameworkId; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::activateFramework( |
| const FrameworkID& frameworkId) |
| { |
| CHECK(initialized); |
| |
| Framework& framework = *CHECK_NOTNONE(getFramework(frameworkId)); |
| |
| framework.active = true; |
| |
| // Activate all roles for this framework except the roles that |
| // are marked as deactivated. |
| // Note: A subset of framework roles can be deactivated if the |
| // role is specified in `suppressed_roles` during framework |
| // (re)registration, or via a subsequent `SUPPRESS` call. |
| foreach (const string& role, framework.roles) { |
| Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); |
| |
| if (!framework.suppressedRoles.count(role)) { |
| frameworkSorter->activate(frameworkId.value()); |
| } |
| } |
| |
| LOG(INFO) << "Activated framework " << frameworkId; |
| |
| generateOffers(); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::deactivateFramework( |
| const FrameworkID& frameworkId) |
| { |
| CHECK(initialized); |
| |
| Framework& framework = *CHECK_NOTNONE(getFramework(frameworkId)); |
| |
| foreach (const string& role, framework.roles) { |
| Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); |
| |
| frameworkSorter->deactivate(frameworkId.value()); |
| |
| // Note that the Sorter *does not* remove the resources allocated |
| // to this framework. For now, this is important because if the |
| // framework fails over and is activated, we still want a record |
| // of the resources that it is using. We might be able to collapse |
| // the added/removed and activated/deactivated in the future. |
| } |
| |
| framework.active = false; |
| |
| framework.offerFilters.clear(); |
| framework.inverseOfferFilters.clear(); |
| |
| LOG(INFO) << "Deactivated framework " << frameworkId; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::updateFramework( |
| const FrameworkID& frameworkId, |
| const FrameworkInfo& frameworkInfo, |
| const set<string>& suppressedRoles) |
| { |
| CHECK(initialized); |
| |
| Framework& framework = *CHECK_NOTNONE(getFramework(frameworkId)); |
| |
| const set<string> oldRoles = framework.roles; |
| const set<string> newRoles = protobuf::framework::getRoles(frameworkInfo); |
| const set<string> oldSuppressedRoles = framework.suppressedRoles; |
| |
| foreach (const string& role, newRoles - oldRoles) { |
| framework.metrics->addSubscribedRole(role); |
| |
| // NOTE: It's possible that we're already tracking this framework |
| // under the role because a framework can unsubscribe from a role |
| // while it still has resources allocated to the role. |
| if (!isFrameworkTrackedUnderRole(frameworkId, role)) { |
| // TODO(mzhu): `CHECK` the above case. |
| trackFrameworkUnderRole(framework, role); |
| } |
| |
| frameworkSorters.at(role)->activate(frameworkId.value()); |
| } |
| |
| foreach (const string& role, oldRoles - newRoles) { |
| Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); |
| |
| frameworkSorter->deactivate(frameworkId.value()); |
| |
| tryUntrackFrameworkUnderRole(framework, role); |
| |
| if (framework.offerFilters.contains(role)) { |
| framework.offerFilters.erase(role); |
| } |
| |
| framework.metrics->removeSubscribedRole(role); |
| framework.suppressedRoles.erase(role); |
| } |
| |
| framework.roles = newRoles; |
| framework.capabilities = frameworkInfo.capabilities(); |
| framework.minAllocatableResources = |
| unpackFrameworkOfferFilters(frameworkInfo.offer_filters()); |
| |
| suppressRoles(framework, suppressedRoles - oldSuppressedRoles); |
| reviveRoles(framework, (oldSuppressedRoles - suppressedRoles) & newRoles); |
| |
| CHECK(framework.suppressedRoles == suppressedRoles) |
| << "After updating framework " << frameworkId |
| << " its set of suppressed roles " << stringify(framework.suppressedRoles) |
| << " differs from required " << stringify(suppressedRoles); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::addSlave( |
| const SlaveID& slaveId, |
| const SlaveInfo& slaveInfo, |
| const vector<SlaveInfo::Capability>& capabilities, |
| const Option<Unavailability>& unavailability, |
| const Resources& total, |
| const hashmap<FrameworkID, Resources>& used) |
| { |
| CHECK(initialized); |
| CHECK_NOT_CONTAINS(slaves, slaveId); |
| CHECK_EQ(slaveId, slaveInfo.id()); |
| CHECK(!paused || expectedAgentCount.isSome()); |
| |
| slaves.insert({slaveId, |
| Slave( |
| slaveInfo, |
| protobuf::slave::Capabilities(capabilities), |
| true, |
| total, |
| used)}); |
| |
| Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); |
| |
| // NOTE: We currently implement maintenance in the allocator to be able to |
| // leverage state and features such as the FrameworkSorter and OfferFilter. |
| if (unavailability.isSome()) { |
| slave.maintenance = Slave::Maintenance(unavailability.get()); |
| } |
| |
| roleTree.trackReservations(total.reserved()); |
| |
| const ResourceQuantities agentScalarQuantities = |
| ResourceQuantities::fromScalarResources(total.scalars()); |
| |
| totalScalarQuantities += agentScalarQuantities; |
| roleSorter->addSlave(slaveId, agentScalarQuantities); |
| |
| foreachvalue (const Owned<Sorter>& sorter, frameworkSorters) { |
| sorter->addSlave(slaveId, agentScalarQuantities); |
| } |
| |
| foreachpair (const FrameworkID& frameworkId, |
| const Resources& allocation, |
| used) { |
| // There are two cases here: |
| // |
| // (1) The framework has already been added to the allocator. |
| // In this case, we track the allocation in the sorters. |
| // |
| // (2) The framework has not yet been added to the allocator. |
| // The master will imminently add the framework using |
| // the `FrameworkInfo` recovered from the agent, and in |
| // the interim we do not track the resources allocated to |
| // this framework. This leaves a small window where the |
| // role sorting will under-account for the roles belonging |
| // to this framework. |
| // |
| // TODO(bmahler): Fix the issue outlined in (2). |
| if (!frameworks.contains(frameworkId)) { |
| continue; |
| } |
| |
| trackAllocatedResources(slaveId, frameworkId, allocation); |
| |
| roleTree.trackAllocated(slaveId, allocation); |
| } |
| |
| // If we have just a number of recovered agents, we cannot distinguish |
| // between "old" agents from the registry and "new" ones joined after |
| // recovery has started. Because we do not persist enough information |
| // to base logical decisions on, any accounting algorithm here will be |
| // crude. Hence we opted for checking whether a certain amount of cluster |
| // capacity is back online, so that we are reasonably confident that we |
| // will not over-commit too many resources to quota that we will not be |
| // able to revoke. |
| if (paused && |
| expectedAgentCount.isSome() && |
| (static_cast<int>(slaves.size()) >= expectedAgentCount.get())) { |
| VLOG(1) << "Recovery complete: sufficient amount of agents added; " |
| << slaves.size() << " agents known to the allocator"; |
| |
| expectedAgentCount = None(); |
| resume(); |
| } |
| |
| LOG(INFO) |
| << "Added agent " << slaveId << " (" << slave.info.hostname() << ")" |
| << " with " << slave.getTotal() |
| << " (offered or allocated: " << slave.getTotalOfferedOrAllocated() << ")"; |
| |
| generateOffers(slaveId); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::removeSlave( |
| const SlaveID& slaveId) |
| { |
| CHECK(initialized); |
| |
| { |
| const Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); |
| |
| // untrackAllocatedResources() potentially removes allocation roles, thus |
| // we need to untrack actually allocated resources in the roles tree first. |
| roleTree.untrackAllocated(slaveId, slave.totalAllocated); |
| |
| // Untrack resources in roleTree and sorter. |
| foreachpair ( |
| const FrameworkID& frameworkId, |
| const Resources& resources, |
| slave.getOfferedOrAllocated()) { |
| untrackAllocatedResources(slaveId, frameworkId, resources); |
| } |
| |
| roleSorter->removeSlave(slaveId); |
| |
| foreachvalue (const Owned<Sorter>& sorter, frameworkSorters) { |
| sorter->removeSlave(slaveId); |
| } |
| |
| roleTree.untrackReservations(slave.getTotal().reserved()); |
| |
| const ResourceQuantities agentScalarQuantities = |
| ResourceQuantities::fromScalarResources(slave.getTotal().scalars()); |
| |
| CHECK_CONTAINS(totalScalarQuantities, agentScalarQuantities); |
| totalScalarQuantities -= agentScalarQuantities; |
| } |
| |
| slaves.erase(slaveId); |
| allocationCandidates.erase(slaveId); |
| |
| removeFilters(slaveId); |
| |
| LOG(INFO) << "Removed agent " << slaveId; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::updateSlave( |
| const SlaveID& slaveId, |
| const SlaveInfo& info, |
| const Option<Resources>& total, |
| const Option<vector<SlaveInfo::Capability>>& capabilities) |
| { |
| CHECK(initialized); |
| CHECK_EQ(slaveId, info.id()); |
| |
| Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); |
| |
| bool updated = false; |
| |
| // Remove all offer filters for this slave if it was restarted with changed |
| // attributes. We do this because schedulers might have decided that they're |
| // not interested in offers from this slave based on the non-presence of some |
| // required attributes, and right now they have no other way of learning |
| // about this change. |
| // TODO(bennoe): Once the agent lifecycle design is implemented, there is a |
| // better way to notify frameworks about such changes and let them make this |
| // decision. We should think about ways to safely remove this check at that |
| // point in time. |
| if (!(Attributes(info.attributes()) == Attributes(slave.info.attributes()))) { |
| updated = true; |
| removeFilters(slaveId); |
| } |
| |
| if (!(slave.info == info)) { |
| updated = true; |
| |
| // We unconditionally overwrite the old domain and hostname: Even though |
| // the master places some restrictions on this (i.e. agents are not allowed |
| // to reregister with a different hostname) inside the allocator it |
| // doesn't matter, as the algorithm will work correctly either way. |
| slave.info = info; |
| } |
| |
| // Update agent capabilities. |
| if (capabilities.isSome()) { |
| protobuf::slave::Capabilities newCapabilities(capabilities.get()); |
| protobuf::slave::Capabilities oldCapabilities(slave.capabilities); |
| |
| slave.capabilities = newCapabilities; |
| |
| if (newCapabilities != oldCapabilities) { |
| updated = true; |
| |
| LOG(INFO) << "Agent " << slaveId << " (" << slave.info.hostname() << ")" |
| << " updated with capabilities " << slave.capabilities; |
| } |
| } |
| |
| if (total.isSome()) { |
| updated = updateSlaveTotal(slaveId, total.get()) || updated; |
| |
| LOG(INFO) << "Agent " << slaveId << " (" << slave.info.hostname() << ")" |
| << " updated with total resources " << total.get(); |
| } |
| |
| if (updated) { |
| generateOffers(slaveId); |
| } |
| } |
| |
| |
| void HierarchicalAllocatorProcess::addResourceProvider( |
| const SlaveID& slaveId, |
| const Resources& total, |
| const hashmap<FrameworkID, Resources>& used) |
| { |
| CHECK(initialized); |
| |
| Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); |
| updateSlaveTotal(slaveId, slave.getTotal() + total); |
| |
| foreachpair (const FrameworkID& frameworkId, |
| const Resources& allocation, |
| used) { |
| // There are two cases here: |
| // |
| // (1) The framework has already been added to the allocator. |
| // In this case, we track the allocation in the sorters |
| // and the role tree. |
| // |
| // (2) The framework has not yet been added to the allocator. |
| // We do not track the resources allocated to this |
| // framework. This leaves a small window where the role |
| // sorting will under-account for the roles belonging |
| // to this framework. This case should never occur since |
| // the master will always add the framework first. |
| if (!frameworks.contains(frameworkId)) { |
| continue; |
| } |
| |
| slave.decreaseAvailable(frameworkId, allocation); |
| trackAllocatedResources(slaveId, frameworkId, allocation); |
| } |
| |
| VLOG(1) |
| << "Grew agent " << slaveId << " by " |
| << total << " (total), " |
| << used << " (used)"; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::removeFilters(const SlaveID& slaveId) |
| { |
| CHECK(initialized); |
| |
| foreachvalue (Framework& framework, frameworks) { |
| framework.inverseOfferFilters.erase(slaveId); |
| |
| // Need a typedef here, otherwise the preprocessor gets confused |
| // by the comma in the template argument list. |
| typedef hashmap<SlaveID, hashset<shared_ptr<OfferFilter>>> Filters; |
| foreachvalue (Filters& filters, framework.offerFilters) { |
| filters.erase(slaveId); |
| } |
| } |
| |
| LOG(INFO) << "Removed all filters for agent " << slaveId; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::activateSlave( |
| const SlaveID& slaveId) |
| { |
| CHECK(initialized); |
| |
| Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); |
| slave.activated = true; |
| |
| LOG(INFO) << "Agent " << slaveId << " reactivated"; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::deactivateSlave( |
| const SlaveID& slaveId) |
| { |
| CHECK(initialized); |
| |
| Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); |
| slave.activated = false; |
| |
| LOG(INFO) << "Agent " << slaveId << " deactivated"; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::updateWhitelist( |
| const Option<hashset<string>>& _whitelist) |
| { |
| CHECK(initialized); |
| |
| whitelist = _whitelist; |
| |
| if (whitelist.isSome()) { |
| LOG(INFO) << "Updated agent whitelist: " << stringify(whitelist.get()); |
| |
| if (whitelist->empty()) { |
| LOG(WARNING) << "Whitelist is empty, no offers will be made!"; |
| } |
| } else { |
| LOG(INFO) << "Advertising offers for all agents"; |
| } |
| } |
| |
| |
| void HierarchicalAllocatorProcess::requestResources( |
| const FrameworkID& frameworkId, |
| const vector<Request>& requests) |
| { |
| CHECK(initialized); |
| |
| LOG(INFO) << "Received resource request from framework " << frameworkId; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::updateAllocation( |
| const FrameworkID& frameworkId, |
| const SlaveID& slaveId, |
| const Resources& offeredResources, |
| const vector<ResourceConversion>& conversions) |
| { |
| CHECK(initialized); |
| CHECK_CONTAINS(frameworks, frameworkId); |
| |
| Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); |
| |
| // We require that an allocation is tied to a single role. |
| // |
| // TODO(bmahler): The use of `Resources::allocations()` induces |
| // unnecessary copying of `Resources` objects (which is expensive |
| // at the time this was written). |
| hashmap<string, Resources> allocations = offeredResources.allocations(); |
| |
| CHECK_EQ(1u, allocations.size()); |
| |
| string role = allocations.begin()->first; |
| |
| Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); |
| |
| const Resources frameworkAllocation = |
| frameworkSorter->allocation(frameworkId.value(), slaveId); |
| |
| // We keep a copy of the offered resources here and it is updated |
| // by the specified resource conversions. |
| // |
| // The resources in the resource conversions should have been |
| // normalized by the master (contains proper AllocationInfo). |
| // |
| // TODO(bmahler): Check that the resources in the resource |
| // conversions have AllocationInfo set. The master should enforce |
| // this. E.g. |
| // |
| // foreach (const ResourceConversion& conversion, conversions) { |
| // CHECK_NONE(validateConversionOnAllocatedResources(conversion)); |
| // } |
| Resources updatedOfferedResources = |
| CHECK_NOTERROR(offeredResources.apply(conversions)); |
| |
| // Update the per-slave allocation. |
| slave.increaseAvailable(frameworkId, offeredResources); |
| slave.decreaseAvailable(frameworkId, updatedOfferedResources); |
| |
| roleTree.untrackOfferedOrAllocated(slaveId, offeredResources); |
| roleTree.trackOfferedOrAllocated(slaveId, updatedOfferedResources); |
| |
| // Update the allocation in the framework sorter. |
| frameworkSorter->update( |
| frameworkId.value(), |
| slaveId, |
| offeredResources, |
| updatedOfferedResources); |
| |
| // Update the allocation in the role sorter. |
| roleSorter->update( |
| role, |
| slaveId, |
| offeredResources, |
| updatedOfferedResources); |
| |
| // Update the agent total resources so they are consistent with the updated |
| // allocation. We do not directly use `updatedOfferedResources` here because |
| // the agent's total resources shouldn't contain: |
| // 1. The additionally allocated shared resources. |
| // 2. `AllocationInfo` as set in `updatedOfferedResources`. |
| // |
| // We strip `AllocationInfo` from conversions in order to apply them |
| // successfully, since agent's total is stored as unallocated resources. |
| vector<ResourceConversion> strippedConversions; |
| Resources removedResources; |
| foreach (const ResourceConversion& conversion, conversions) { |
| // TODO(jieyu): Ideally, we should make sure agent's total |
| // resources are consistent with agent's allocation in terms of |
| // shared resources. In other words, we should increase agent's |
| // total resources as well for those additional allocation we did |
| // for shared resources. However, that means we need to update the |
| // agent's total resources when performing allocation for shared |
| // resources (in `__allocate()`). For now, we detect "additional" |
| // allocation for shared resources by checking if a conversion has |
| // an empty `consumed` field. |
| if (conversion.consumed.empty()) { |
| continue; |
| } |
| |
| // NOTE: For now, a resource conversion must either not change the resource |
| // quantities, or completely remove the consumed resources. See MESOS-8825. |
| if (conversion.converted.empty()) { |
| removedResources += conversion.consumed; |
| } |
| |
| Resources consumed = conversion.consumed; |
| Resources converted = conversion.converted; |
| |
| consumed.unallocate(); |
| converted.unallocate(); |
| |
| strippedConversions.emplace_back(consumed, converted); |
| } |
| |
| Try<Resources> updatedTotal = slave.getTotal().apply(strippedConversions); |
| CHECK_SOME(updatedTotal); |
| |
| updateSlaveTotal(slaveId, updatedTotal.get()); |
| |
| const Resources updatedFrameworkAllocation = |
| frameworkSorter->allocation(frameworkId.value(), slaveId); |
| |
| // Check that the changed quantities of the framework's allocation is exactly |
| // the same as the resources removed by the resource conversions. |
| // |
| // TODO(chhsiao): Revisit this constraint if we want to support other type of |
| // resource conversions. See MESOS-9015. |
| const Resources removedAllocationQuantities = |
| frameworkAllocation.createStrippedScalarQuantity() - |
| updatedFrameworkAllocation.createStrippedScalarQuantity(); |
| CHECK_EQ( |
| removedAllocationQuantities, |
| removedResources.createStrippedScalarQuantity()); |
| |
| LOG(INFO) << "Updated allocation of framework " << frameworkId |
| << " on agent " << slaveId |
| << " from " << frameworkAllocation |
| << " to " << updatedFrameworkAllocation; |
| } |
| |
| |
| Future<Nothing> HierarchicalAllocatorProcess::updateAvailable( |
| const SlaveID& slaveId, |
| const vector<Offer::Operation>& operations) |
| { |
| // Note that the operations may contain allocated resources, |
| // however such operations can be applied to unallocated |
| // resources unambiguously, so we don't have a strict CHECK |
| // for the operations to contain only unallocated resources. |
| |
| CHECK(initialized); |
| |
| Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); |
| |
| // It's possible for this 'apply' to fail here because a call to |
| // 'allocate' could have been enqueued by the allocator itself |
| // just before master's request to enqueue 'updateAvailable' |
| // arrives to the allocator. |
| // |
| // Master -------R------------ |
| // \----+ |
| // | |
| // Allocator --A-----A-U---A-- |
| // \___/ \___/ |
| // |
| // where A = allocate, R = reserve, U = updateAvailable |
| Try<Resources> updatedAvailable = slave.getAvailable().apply(operations); |
| if (updatedAvailable.isError()) { |
| VLOG(1) << "Failed to update available resources on agent " << slaveId |
| << ": " << updatedAvailable.error(); |
| return Failure(updatedAvailable.error()); |
| } |
| |
| // Update the total resources. |
| Try<Resources> updatedTotal = slave.getTotal().apply(operations); |
| CHECK_SOME(updatedTotal); |
| |
| // Update the total resources in the sorter. |
| updateSlaveTotal(slaveId, updatedTotal.get()); |
| |
| return Nothing(); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::updateUnavailability( |
| const SlaveID& slaveId, |
| const Option<Unavailability>& unavailability) |
| { |
| CHECK(initialized); |
| |
| Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); |
| |
| // NOTE: We currently implement maintenance in the allocator to be able to |
| // leverage state and features such as the FrameworkSorter and OfferFilter. |
| |
| // We explicitly remove all filters for the inverse offers of this slave. We |
| // do this because we want to force frameworks to reassess the calculations |
| // they have made to respond to the inverse offer. Unavailability of a slave |
| // can have a large effect on failure domain calculations and inter-leaved |
| // unavailability schedules. |
| foreachvalue (Framework& framework, frameworks) { |
| framework.inverseOfferFilters.erase(slaveId); |
| } |
| |
| // Remove any old unavailability. |
| slave.maintenance = None(); |
| |
| // If we have a new unavailability. |
| if (unavailability.isSome()) { |
| slave.maintenance = Slave::Maintenance(unavailability.get()); |
| } |
| |
| generateOffers(slaveId); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::updateInverseOffer( |
| const SlaveID& slaveId, |
| const FrameworkID& frameworkId, |
| const Option<UnavailableResources>& unavailableResources, |
| const Option<InverseOfferStatus>& status, |
| const Option<Filters>& filters) |
| { |
| CHECK(initialized); |
| |
| Framework& framework = *CHECK_NOTNONE(getFramework(frameworkId)); |
| Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); |
| |
| CHECK(slave.maintenance.isSome()) |
| << "Agent " << slaveId |
| << " (" << slave.info.hostname() << ") should have maintenance scheduled"; |
| |
| // NOTE: We currently implement maintenance in the allocator to be able to |
| // leverage state and features such as the FrameworkSorter and OfferFilter. |
| |
| // We use a reference by alias because we intend to modify the |
| // `maintenance` and to improve readability. |
| Slave::Maintenance& maintenance = slave.maintenance.get(); |
| |
| // Only handle inverse offers that we currently have outstanding. If it is not |
| // currently outstanding this means it is old and can be safely ignored. |
| if (maintenance.offersOutstanding.contains(frameworkId)) { |
| // We always remove the outstanding offer so that we will send a new offer |
| // out the next time we schedule inverse offers. |
| maintenance.offersOutstanding.erase(frameworkId); |
| |
| // If the response is `Some`, this means the framework responded. Otherwise |
| // if it is `None` the inverse offer timed out or was rescinded. |
| if (status.isSome()) { |
| // For now we don't allow frameworks to respond with `UNKNOWN`. The caller |
| // should guard against this. This goes against the pattern of not |
| // checking external invariants; however, the allocator and master are |
| // currently so tightly coupled that this check is valuable. |
| CHECK_NE(status->status(), InverseOfferStatus::UNKNOWN); |
| |
| // If the framework responded, we update our state to match. |
| maintenance.statuses[frameworkId].CopyFrom(status.get()); |
| } |
| } |
| |
| // No need to install filters if `filters` is none. |
| if (filters.isNone()) { |
| return; |
| } |
| |
| // Create a refused inverse offer filter. |
| Try<Duration> timeout = Duration::create(Filters().refuse_seconds()); |
| |
| if (filters->refuse_seconds() > Days(365).secs()) { |
| LOG(WARNING) << "Using 365 days to create the refused inverse offer" |
| << " filter because the input value is too big"; |
| |
| timeout = Days(365); |
| } else if (filters->refuse_seconds() < 0) { |
| LOG(WARNING) << "Using the default value of 'refuse_seconds' to create" |
| << " the refused inverse offer filter because the input" |
| << " value is negative"; |
| |
| timeout = Duration::create(Filters().refuse_seconds()); |
| } else { |
| timeout = Duration::create(filters->refuse_seconds()); |
| |
| if (timeout.isError()) { |
| LOG(WARNING) << "Using the default value of 'refuse_seconds' to create" |
| << " the refused inverse offer filter because the input" |
| << " value is invalid: " + timeout.error(); |
| |
| timeout = Duration::create(Filters().refuse_seconds()); |
| } |
| } |
| |
| CHECK_SOME(timeout); |
| |
| if (timeout.get() != Duration::zero()) { |
| VLOG(1) << "Framework " << frameworkId |
| << " filtered inverse offers from agent " << slaveId |
| << " for " << timeout.get(); |
| |
| // Create a new inverse offer filter and delay its expiration. |
| shared_ptr<RefusedInverseOfferFilter> inverseOfferFilter = |
| make_shared<RefusedInverseOfferFilter>(*timeout); |
| |
| framework.inverseOfferFilters[slaveId].insert(inverseOfferFilter); |
| |
| weak_ptr<InverseOfferFilter> weakPtr = inverseOfferFilter; |
| |
| inverseOfferFilter->expired() |
| .onReady(defer(self(), [=](Nothing) { |
| expire(frameworkId, slaveId, weakPtr); |
| })); |
| } |
| } |
| |
| |
| Future<hashmap<SlaveID, hashmap<FrameworkID, InverseOfferStatus>>> |
| HierarchicalAllocatorProcess::getInverseOfferStatuses() |
| { |
| CHECK(initialized); |
| |
| hashmap<SlaveID, hashmap<FrameworkID, InverseOfferStatus>> result; |
| |
| // Make a copy of the most recent statuses. |
| foreachpair (const SlaveID& id, const Slave& slave, slaves) { |
| if (slave.maintenance.isSome()) { |
| result[id] = slave.maintenance->statuses; |
| } |
| } |
| |
| return result; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::transitionOfferedToAllocated( |
| const SlaveID& slaveId, |
| const Resources& resources) |
| { |
| CHECK_NOTNONE(getSlave(slaveId))->totalAllocated += resources; |
| roleTree.trackAllocated(slaveId, resources); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::recoverResources( |
| const FrameworkID& frameworkId, |
| const SlaveID& slaveId, |
| const Resources& resources, |
| const Option<Filters>& filters, |
| bool isAllocated) |
| { |
| CHECK(initialized); |
| |
| if (resources.empty()) { |
| return; |
| } |
| |
| Option<Slave*> slave = getSlave(slaveId); |
| |
| if (isAllocated && slave.isSome()) { |
| CHECK_CONTAINS((*slave)->totalAllocated, resources); |
| (*slave)->totalAllocated -= resources; |
| roleTree.untrackAllocated(slaveId, resources); |
| } |
| |
| Option<Framework*> framework = getFramework(frameworkId); |
| // No work to do if either the framework or the agent no longer exists. |
| // |
| // The framework may not exist if we dispatched Master::offer before we |
| // received MesosAllocatorProcess::removeFramework or |
| // MesosAllocatorProcess::deactivateFramework, in which case we will |
| // have already recovered all of its resources). |
| // |
| // The agent may not exist if we dispatched Master::offer before we |
| // received `removeSlave`. |
| if (framework.isNone() || slave.isNone()) { |
| return; |
| } |
| |
| // For now, we require that resources are recovered within a single |
| // allocation role (since filtering in the same manner across roles |
| // seems undesirable). |
| // |
| // TODO(bmahler): The use of `Resources::allocations()` induces |
| // unnecessary copying of `Resources` objects (which is expensive |
| // at the time this was written). |
| hashmap<string, Resources> allocations = resources.allocations(); |
| |
| CHECK_EQ(1u, allocations.size()); |
| |
| string role = allocations.begin()->first; |
| |
| // Update resources on the agent. |
| |
| CHECK((*slave)->getTotalOfferedOrAllocated().contains(resources)) |
| << "agent " << slaveId << " resources " |
| << (*slave)->getTotalOfferedOrAllocated() << " do not contain " |
| << resources; |
| |
| (*slave)->increaseAvailable(frameworkId, resources); |
| |
| VLOG(1) << "Recovered " << resources << " (total: " << (*slave)->getTotal() |
| << ", offered or allocated: " |
| << (*slave)->getTotalOfferedOrAllocated() << ")" |
| << " on agent " << slaveId << " from framework " << frameworkId; |
| |
| // Update role tree and sorter. |
| |
| Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); |
| |
| if (frameworkSorter->contains(frameworkId.value())) { |
| untrackAllocatedResources(slaveId, frameworkId, resources); |
| } |
| |
| // No need to install the filter if 'filters' is none. |
| if (filters.isNone()) { |
| return; |
| } |
| |
| // Update filters. |
| |
| // Create a refused resources filter. |
| Try<Duration> timeout = Duration::create(Filters().refuse_seconds()); |
| |
| if (filters->refuse_seconds() > Days(365).secs()) { |
| LOG(WARNING) << "Using 365 days to create the refused resources offer" |
| << " filter because the input value is too big"; |
| |
| timeout = Days(365); |
| } else if (filters->refuse_seconds() < 0) { |
| LOG(WARNING) << "Using the default value of 'refuse_seconds' to create" |
| << " the refused resources offer filter because the input" |
| << " value is negative"; |
| |
| timeout = Duration::create(Filters().refuse_seconds()); |
| } else { |
| timeout = Duration::create(filters->refuse_seconds()); |
| |
| if (timeout.isError()) { |
| LOG(WARNING) << "Using the default value of 'refuse_seconds' to create" |
| << " the refused resources offer filter because the input" |
| << " value is invalid: " + timeout.error(); |
| |
| timeout = Duration::create(Filters().refuse_seconds()); |
| } |
| } |
| |
| CHECK_SOME(timeout); |
| |
| if (timeout.get() != Duration::zero()) { |
| VLOG(1) << "Framework " << frameworkId |
| << " filtered agent " << slaveId |
| << " for " << timeout.get(); |
| |
| // Expire the filter after both an `allocationInterval` and the |
| // `timeout` have elapsed. This ensures that the filter does not |
| // expire before we perform the next allocation for this agent, |
| // see MESOS-4302 for more information. |
| // |
| // Because the next periodic allocation goes through a dispatch |
| // after `allocationInterval`, we do the same for `expire()` |
| // (with a helper `_expire()`) to achieve the above. |
| // |
| // TODO(alexr): If we allocated upon resource recovery |
| // (MESOS-3078), we would not need to increase the timeout here. |
| timeout = std::max(options.allocationInterval, timeout.get()); |
| |
| // Create a new filter. Note that we unallocate the resources |
| // since filters are applied per-role already. |
| Resources unallocated = resources; |
| unallocated.unallocate(); |
| |
| shared_ptr<RefusedOfferFilter> offerFilter = |
| make_shared<RefusedOfferFilter>(unallocated, *timeout); |
| |
| (*framework)->offerFilters[role][slaveId].insert(offerFilter); |
| |
| weak_ptr<OfferFilter> weakPtr = offerFilter; |
| |
| offerFilter->expired() |
| .onReady(defer(self(), [=](Nothing) { |
| expire(frameworkId, role, slaveId, weakPtr); |
| })); |
| } |
| } |
| |
| |
| void HierarchicalAllocatorProcess::suppressRoles( |
| Framework& framework, const set<string>& roles) |
| { |
| CHECK(initialized); |
| |
| // Deactivating the framework in the sorter is fine as long as |
| // SUPPRESS is not parameterized. When parameterization is added, |
| // we have to differentiate between the cases here. |
| |
| foreach (const string& role, roles) { |
| Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); |
| |
| frameworkSorter->deactivate(framework.frameworkId.value()); |
| framework.suppressedRoles.insert(role); |
| framework.metrics->suppressRole(role); |
| } |
| |
| // TODO(bmahler): This logs roles that were already suppressed, |
| // only log roles that transitioned from unsuppressed -> suppressed. |
| LOG(INFO) << "Suppressed offers for roles " << stringify(roles) |
| << " of framework " << framework.frameworkId; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::suppressOffers( |
| const FrameworkID& frameworkId, |
| const set<string>& roles_) |
| { |
| Framework& framework = *CHECK_NOTNONE(getFramework(frameworkId)); |
| |
| const set<string>& roles = roles_.empty() ? framework.roles : roles_; |
| suppressRoles(framework, roles); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::reviveRoles( |
| Framework& framework, const set<string>& roles) |
| { |
| CHECK(initialized); |
| |
| framework.inverseOfferFilters.clear(); |
| |
| foreach (const string& role, roles) { |
| framework.offerFilters.erase(role); |
| } |
| |
| // Activating the framework in the sorter is fine as long as |
| // SUPPRESS is not parameterized. When parameterization is added, |
| // we may need to differentiate between the cases here. |
| foreach (const string& role, roles) { |
| Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); |
| |
| frameworkSorter->activate(framework.frameworkId.value()); |
| framework.suppressedRoles.erase(role); |
| framework.metrics->reviveRole(role); |
| } |
| |
| // TODO(bmahler): This logs roles that were already unsuppressed, |
| // only log roles that transitioned from suppressed -> unsuppressed. |
| LOG(INFO) << "Unsuppressed offers and cleared filters for roles " |
| << stringify(roles) << " of framework " << framework.frameworkId; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::reviveOffers( |
| const FrameworkID& frameworkId, |
| const set<string>& roles) |
| { |
| CHECK(initialized); |
| |
| Framework& framework = *CHECK_NOTNONE(getFramework(frameworkId)); |
| |
| reviveRoles(framework, roles.empty() ? framework.roles : roles); |
| |
| generateOffers(); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::updateQuota( |
| const string& role, const Quota& quota) |
| { |
| CHECK(initialized); |
| |
| roleTree.updateQuota(role, quota); |
| metrics.updateQuota(role, quota); |
| |
| LOG(INFO) << "Updated quota for role '" << role << "', " |
| << " guarantees: " << quota.guarantees |
| << " limits: " << quota.limits; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::updateWeights( |
| const vector<WeightInfo>& weightInfos) |
| { |
| CHECK(initialized); |
| |
| foreach (const WeightInfo& weightInfo, weightInfos) { |
| CHECK(weightInfo.has_role()); |
| roleTree.updateWeight(weightInfo.role(), weightInfo.weight()); |
| roleSorter->updateWeight(weightInfo.role(), weightInfo.weight()); |
| } |
| |
| // NOTE: Since weight changes do not result in rebalancing of |
| // offered resources, we do not trigger an allocation here; the |
| // weight change will be reflected in subsequent allocations. |
| // |
| // If we add the ability for weight changes to incur a rebalancing |
| // of offered resources, then we should trigger that here. |
| } |
| |
| |
| void HierarchicalAllocatorProcess::pause() |
| { |
| if (!paused) { |
| VLOG(1) << "Allocation paused"; |
| |
| paused = true; |
| } |
| } |
| |
| |
| void HierarchicalAllocatorProcess::resume() |
| { |
| if (paused) { |
| VLOG(1) << "Allocation resumed"; |
| |
| paused = false; |
| } |
| } |
| |
| |
| Future<Nothing> HierarchicalAllocatorProcess::generateOffers() |
| { |
| return generateOffers(slaves.keys()); |
| } |
| |
| |
| Future<Nothing> HierarchicalAllocatorProcess::generateOffers( |
| const SlaveID& slaveId) |
| { |
| hashset<SlaveID> slaves({slaveId}); |
| return generateOffers(slaves); |
| } |
| |
| |
| Future<Nothing> HierarchicalAllocatorProcess::generateOffers( |
| const hashset<SlaveID>& slaveIds) |
| { |
| if (paused) { |
| VLOG(2) << "Skipped allocation because the allocator is paused"; |
| |
| return Nothing(); |
| } |
| |
| allocationCandidates |= slaveIds; |
| |
| if (offerGeneration.isNone() || !offerGeneration->isPending()) { |
| metrics.allocation_run_latency.start(); |
| offerGeneration = dispatch(self(), &Self::_generateOffers); |
| } |
| |
| return offerGeneration.get(); |
| } |
| |
| |
| Nothing HierarchicalAllocatorProcess::_generateOffers() |
| { |
| metrics.allocation_run_latency.stop(); |
| |
| if (paused) { |
| VLOG(2) << "Skipped allocation because the allocator is paused"; |
| |
| return Nothing(); |
| } |
| |
| ++metrics.allocation_runs; |
| |
| Stopwatch stopwatch; |
| stopwatch.start(); |
| metrics.allocation_run.start(); |
| |
| __generateOffers(); |
| |
| // NOTE: For now, we implement maintenance inverse offers within the |
| // allocator. We leverage the existing timer/cycle of offers to also do any |
| // inverse offers generation necessary to satisfy maintenance needs. |
| generateInverseOffers(); |
| |
| metrics.allocation_run.stop(); |
| |
| VLOG(1) << "Performed allocation for " << allocationCandidates.size() |
| << " agents in " << stopwatch.elapsed(); |
| |
| // Clear the candidates on completion of the allocation run. |
| allocationCandidates.clear(); |
| |
| return Nothing(); |
| } |
| |
| |
| // TODO(alexr): Consider factoring out the quota allocation logic. |
| void HierarchicalAllocatorProcess::__generateOffers() |
| { |
| // Compute the offerable resources, per framework: |
| // (1) For reserved resources on the slave, allocate these to a |
| // framework having the corresponding role. |
| // (2) For unreserved resources on the slave, allocate these |
| // to a framework of any role. |
| hashmap<FrameworkID, hashmap<string, hashmap<SlaveID, Resources>>> offerable; |
| |
| // NOTE: This function can operate on a small subset of |
| // `allocationCandidates`, we have to make sure that we don't |
| // assume cluster knowledge when summing resources from that set. |
| |
| vector<SlaveID> slaveIds; |
| slaveIds.reserve(allocationCandidates.size()); |
| |
| // Filter out non-whitelisted, removed, and deactivated slaves |
| // in order not to send offers for them. |
| foreach (const SlaveID& slaveId, allocationCandidates) { |
| Option<Slave*> slave = getSlave(slaveId); |
| |
| if (isWhitelisted(slaveId) && slave.isSome() && (*slave)->activated) { |
| slaveIds.push_back(slaveId); |
| } |
| } |
| |
| // Randomize the order in which slaves' resources are allocated. |
| // |
| // TODO(vinod): Implement a smarter sorting algorithm. |
| std::random_shuffle(slaveIds.begin(), slaveIds.end()); |
| |
| // To enforce quota, we keep track of consumed quota for roles with a |
| // non-default quota. |
| // |
| // NOTE: We build the map here to avoid repetitive aggregation in the |
| // allocation loop. But this map will still need to be updated in the |
| // allocation loop as we make new allocations. |
| // |
| // TODO(mzhu): Build and persist this information across allocation cycles in |
| // track/untrackAllocatedResources(). |
| // |
| // TODO(mzhu): Ideally, we want the sorter to track consumed quota. It then |
| // could use consumed quota instead of allocated resources (the former |
| // includes unallocated reservations while the latter does not) to calculate |
| // the DRF share. This would help to: |
| // |
| // (1) Solve the fairness issue when roles with unallocated |
| // reservations may game the allocator (See MESOS-8299). |
| // |
| // (2) Simplify the quota enforcement logic -- the allocator |
| // would no longer need to track reservations separately. |
| hashmap<string, ResourceQuantities> rolesConsumedQuota; |
| |
| // We only log headroom info if there is any non-default quota set. |
| // We set this flag value as we iterate through all roles below. |
| // |
| // TODO(mzhu): remove this once we can determine if there is any non-default |
| // quota set by looking into the allocator memory state in constant time. |
| bool logHeadroomInfo = false; |
| |
| // We charge a role against its quota by considering its allocation |
| // (including all subrole allocations) as well as any unallocated |
| // reservations (including all subrole reservations) since reservations |
| // are bound to the role. In other words, we always consider reservations |
| // as consuming quota, regardless of whether they are allocated. |
| // It is calculated as: |
| // |
| // Consumed Quota = reservations + unreserved allocation |
| |
| // Add reservations and unreserved offeredOrAllocated. |
| // |
| // Currently, only top level roles can have quota set and thus |
| // we only track consumed quota for top level roles. |
| foreach (const Role* r, roleTree.root()->children()) { |
| // TODO(mzhu): Track all role consumed quota. We may want to expose |
| // these as metrics. |
| if (r->quota() != DEFAULT_QUOTA) { |
| logHeadroomInfo = true; |
| rolesConsumedQuota[r->role] += r->quotaOfferedOrConsumed(); |
| } |
| } |
| |
| // We need to constantly make sure that we are holding back enough |
| // unreserved resources that the remaining quota guarantee can later |
| // be satisfied when needed: |
| // |
| // Required unreserved headroom = |
| // sum (guarantee - consumed quota) for each role. |
| // |
| // Given the above, if a role has more reservations (which count towards |
| // consumed quota) than quota guarantee, we don't need to hold back any |
| // unreserved headroom for it. |
| ResourceQuantities requiredHeadroom; |
| foreach (const Role* r, roleTree.root()->children()) { |
| requiredHeadroom += |
| r->quota().guarantees - |
| rolesConsumedQuota.get(r->role).getOrElse(ResourceQuantities()); |
| } |
| |
| // We will allocate resources while ensuring that the required |
| // unreserved non-revocable headroom is still available. Otherwise, |
| // we will not be able to satisfy the quota guarantee later. |
| // |
| // available headroom = unallocated unreserved non-revocable resources |
| // |
| // We compute this as: |
| // |
| // available headroom = total resources - |
| // allocated resources - |
| // unallocated reservations - |
| // unallocated revocable resources |
| ResourceQuantities availableHeadroom = totalScalarQuantities; |
| |
| // NOTE: The role sorter does not return aggregated allocation |
| // information whereas `reservationScalarQuantities` does, so |
| // we need to loop over only top level roles for the latter. |
| |
| // Subtract allocated resources from the total. |
| availableHeadroom -= roleSorter->allocationScalarQuantities(); |
| |
| // Subtract total unallocated reservations. |
| // unallocated reservations = total reservations - allocated reservations |
| availableHeadroom -= |
| roleTree.root()->reservationScalarQuantities() - |
| roleTree.root()->offeredOrAllocatedReservedScalarQuantities(); |
| |
| // Subtract revocable resources. |
| foreachvalue (const Slave& slave, slaves) { |
| availableHeadroom -= ResourceQuantities::fromScalarResources( |
| slave.getAvailable().revocable().scalars()); |
| } |
| |
| if (logHeadroomInfo) { |
| LOG(INFO) << "Before allocation, required quota headroom is " |
| << requiredHeadroom |
| << " and available quota headroom is " << availableHeadroom; |
| } |
| |
| // Due to the two stages in the allocation algorithm and the nature of |
| // shared resources being re-offerable even if already allocated, the |
| // same shared resources can appear in two (and not more due to the |
| // `allocatable` check in each stage) distinct offers in one allocation |
| // cycle. This is undesirable since the allocator API contract should |
| // not depend on its implementation details. For now we make sure a |
| // shared resource is only allocated once in one offer cycle. We use |
| // `offeredSharedResources` to keep track of shared resources already |
| // allocated in the current cycle. |
| hashmap<SlaveID, Resources> offeredSharedResources; |
| |
| // In the 1st stage, we allocate to roles with non-default quota guarantees. |
| // |
| // NOTE: Even though we keep track of the available headroom, we still |
| // dedicate the first stage for roles with non-default quota guarantees. |
| // The reason is that quota guarantees headroom only acts as a quantity |
| // guarantee. Frameworks might have filters or capabilities such that the |
| // resources set aside for the headroom cannot be used by these frameworks, |
| // resulting in unsatisfied guarantees (despite enough headroom set aside). |
| // Thus we try to satisfy the quota guarantees in this first stage so that |
| // those roles with unsatisfied guarantees can have more choices and higher |
| // probability in getting their guarantees satisfied. |
| foreach (const SlaveID& slaveId, slaveIds) { |
| Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); |
| |
| foreach (const string& role, roleSorter->sort()) { |
| const Quota& quota = getQuota(role); |
| |
| const ResourceQuantities& quotaGuarantees = quota.guarantees; |
| const ResourceLimits& quotaLimits = quota.limits; |
| |
| // We only allocate to roles with non-default guarantees |
| // in the first stage. |
| if (quotaGuarantees.empty()) { |
| continue; |
| } |
| |
| // If there are no active frameworks in this role, we do not |
| // need to do any allocations for this role. |
| bool noFrameworks = [&]() { |
| Option<const Role*> r = roleTree.get(role); |
| |
| return r.isNone() || (*r)->frameworks().empty(); |
| }(); |
| |
| if (noFrameworks) { |
| continue; |
| } |
| |
| // TODO(bmahler): Handle shared volumes, which are always available but |
| // should be excluded here based on `offeredSharedResources`. |
| if (slave.getAvailable().empty()) { |
| break; // Nothing left on this agent. |
| } |
| |
| ResourceQuantities unsatisfiedQuotaGuarantees = |
| quotaGuarantees - |
| rolesConsumedQuota.get(role).getOrElse(ResourceQuantities()); |
| |
| // We only allocate to roles with unsatisfied guarantees |
| // in the first stage. |
| if (unsatisfiedQuotaGuarantees.empty()) { |
| continue; |
| } |
| |
| // Fetch frameworks in the order provided by the sorter. |
| // NOTE: Suppressed frameworks are not included in the sort. |
| Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); |
| |
| foreach (const string& frameworkId_, frameworkSorter->sort()) { |
| if (unsatisfiedQuotaGuarantees.empty()) { |
| break; |
| } |
| |
| // Offer a shared resource only if it has not been offered in this |
| // offer cycle to a framework. |
| Resources available = |
| slave.getAvailable().allocatableTo(role) - |
| offeredSharedResources.get(slaveId).getOrElse(Resources()); |
| |
| if (available.empty()) { |
| break; // Nothing left for the role. |
| } |
| |
| FrameworkID frameworkId; |
| frameworkId.set_value(frameworkId_); |
| |
| const Framework& framework = *CHECK_NOTNONE(getFramework(frameworkId)); |
| CHECK(framework.active) << frameworkId; |
| |
| // An early `continue` optimization. |
| if (!allocatable(available, role, framework)) { |
| continue; |
| } |
| |
| if (!isCapableOfReceivingAgent(framework.capabilities, slave)) { |
| continue; |
| } |
| |
| available = stripIncapableResources(available, framework.capabilities); |
| |
| // In this first stage, we allocate the role's reservations as well as |
| // any unreserved resources while enforcing the role's quota limits and |
| // the global headroom. We'll "chop" the unreserved resources if needed. |
| // |
| // E.g. A role has no allocations or reservations yet and a 10 cpu |
| // quota limits. We'll chop a 15 cpu agent down to only |
| // allocate 10 cpus to the role to keep it within its limits. |
| // |
| // Note on bursting above guarantees up to the limits in the 1st stage: |
| // |
| // In this 1st stage, for resources that the role has non-default |
| // guarantees, we allow the role to burst above this guarantee up to |
| // its limit (while maintaining the global headroom). In addition, |
| // if the role is allocated any resources that help it to make |
| // progress towards its quota guarantees, or the role is being |
| // allocated some reservation(s), we will also allocate all of the |
| // resources (subject to its limits and global headroom) for which it |
| // does not have any guarantees for. |
| // |
| // E.g. The agent has 1 cpu, 1024 mem, 1024 disk, 1 gpu, 5 ports |
| // and the role has guarantees for 1 cpu, 512 mem and no limits. |
| // We'll include all the disk, gpu, and ports in the allocation, |
| // despite the role not having any quota guarantee for them. In |
| // addition, we will also allocate all the 1024 mem to the role. |
| // |
| // Rationale of allocating all non-guaranteed resources on the agent |
| // (subject to role limits and global headroom requirements): |
| // |
| // Currently, it is not possible to set quota on non-scalar resources, |
| // like ports. A user may also only choose to set guarantees on some |
| // scalar resources (e.g. on cpu but not on memory). If we do not |
| // allocate these resources together with the guarantees, frameworks |
| // will get non-usable offers (e.g. with no ports or memory). |
| // However, the downside of this approach is that, after one allocation, |
| // the agent will be deprived of some resources (e.g. no ports), |
| // rendering any subsequent offers non-usable. Users are advised to |
| // leverage the `min_allocatbale_resources` to help prevent such offers |
| // and reduce resource fragmentation. |
| // |
| // Rationale of allowing roles to burst scalar resource allocations up |
| // to its limits (subject to headroom requirements) in this first stage: |
| // |
| // Allowing roles to burst in this first stage would help to reduce |
| // fragmentation--guaranteed resources and non-guarantee bursting |
| // resources are combined into one offer from one agent. However, |
| // the downside is that, such premature bursting will may prevent |
| // subsequent roles from getting guarantees, especially if their |
| // frameworks are picky. This is true despite the enforced headroom |
| // which only enforces quantity. Nevertheless, We choose to allow |
| // such bursting for less resource fragmentation. |
| |
| // Resources that can be used to to increase a role's quota consumption. |
| // |
| // This is hot path, we use explicit filter calls to avoid |
| // multiple traversal. |
| Resources quotaResources = |
| available.filter([&](const Resource& resource) { |
| return resource.type() == Value::SCALAR && |
| Resources::isUnreserved(resource) && |
| !Resources::isRevocable(resource); |
| }); |
| |
| Resources guaranteesOffering = |
| shrinkResources(quotaResources, unsatisfiedQuotaGuarantees); |
| |
| // We allocate this agent only if the role can make progress towards |
| // its quota guarantees i.e. it is getting some unreserved resources |
| // for its guarantees . Otherwise, this role is not going to get any |
| // allocation. We can safely continue here. |
| // |
| // NOTE: For roles with unallocated reservations on this agent, if |
| // its guarantees are already satisfied or this agent has no resources |
| // that can contribute to its guarantees (except the reservation), we |
| // will also skip it here. Its reservations will be allocated in the |
| // second stage. |
| // |
| // NOTE: Since we currently only support top-level roles to |
| // have quota, there are no ancestor reservations involved here. |
| if (guaranteesOffering.empty()) { |
| continue; |
| } |
| |
| // This role's reservations, non-scalar resources and revocable |
| // resources, as well as guarantees are always allocated. |
| // |
| // We need to allocate guarantees unconditionally here so that |
| // even the cluster is overcommitted by guarantees (thus deficit in |
| // headroom), this role's guarantees can still be allocated. |
| Resources toOffer = guaranteesOffering + |
| available.filter([&](const Resource& resource) { |
| return Resources::isReserved(resource, role) || |
| resource.type() != Value::SCALAR || |
| Resources::isRevocable(resource); |
| }); |
| |
| Resources additionalScalarOffering = |
| quotaResources - guaranteesOffering; |
| |
| // Then, non-guaranteed quota resources are subject to quota limits |
| // and global headroom enforcements. |
| |
| // Limits enforcement. |
| if (!quotaLimits.empty()) { |
| additionalScalarOffering = shrinkResources( |
| additionalScalarOffering, |
| quotaLimits - CHECK_NOTNONE(rolesConsumedQuota.get(role)) - |
| ResourceQuantities::fromScalarResources(guaranteesOffering)); |
| } |
| |
| // Headroom enforcement. |
| // |
| // This check is only for performance optimization. |
| if (!requiredHeadroom.empty() && !additionalScalarOffering.empty()) { |
| // Shrink down to surplus headroom. |
| // |
| // Surplus headroom = (availableHeadroom - guaranteesOffering) - |
| // (requiredHeadroom - guaranteesOffering) |
| // = availableHeadroom - requiredHeadroom |
| additionalScalarOffering = shrinkResources( |
| additionalScalarOffering, availableHeadroom - requiredHeadroom); |
| } |
| |
| toOffer += additionalScalarOffering; |
| |
| // If the framework filters these resources, ignore. |
| if (!allocatable(toOffer, role, framework) || |
| isFiltered(framework, role, slave, toOffer)) { |
| continue; |
| } |
| |
| VLOG(2) << "Offering " << toOffer << " on agent " << slaveId |
| << " to role " << role << " of framework " << frameworkId |
| << " as part of its role quota"; |
| |
| toOffer.allocate(role); |
| |
| offerable[frameworkId][role][slaveId] += toOffer; |
| offeredSharedResources[slaveId] += toOffer.shared(); |
| |
| // Update role consumed quota and quota headroom. |
| |
| ResourceQuantities increasedQuotaConsumption = |
| ResourceQuantities::fromScalarResources( |
| guaranteesOffering + additionalScalarOffering); |
| |
| unsatisfiedQuotaGuarantees -= increasedQuotaConsumption; |
| rolesConsumedQuota[role] += increasedQuotaConsumption; |
| for (const string& ancestor : roles::ancestors(role)) { |
| rolesConsumedQuota[ancestor] += increasedQuotaConsumption; |
| } |
| |
| requiredHeadroom -= |
| ResourceQuantities::fromScalarResources(guaranteesOffering); |
| availableHeadroom -= increasedQuotaConsumption; |
| |
| slave.decreaseAvailable(frameworkId, toOffer); |
| |
| trackAllocatedResources(slaveId, frameworkId, toOffer); |
| } |
| } |
| } |
| |
| // Similar to the first stage, we will allocate resources while ensuring |
| // that the required unreserved non-revocable headroom is still available |
| // for unsatisfied quota guarantees. Otherwise, we will not be able to |
| // satisfy quota guarantees later. Reservations and revocable resources |
| // will always be included in the offers since allocating these does not |
| // make progress towards satisifying quota guarantees. |
| // |
| // For logging purposes, we track the number of agents that had resources |
| // held back for quota headroom, as well as how many resources in total |
| // were held back. |
| // |
| // While we also held resources back for quota headroom in the first stage, |
| // we do not track it there. This is because in the second stage, we try to |
| // allocate all resources (including the ones held back in the first stage). |
| // Thus only resources held back in the second stage are truly held back for |
| // the whole allocation cycle. |
| ResourceQuantities heldBackForHeadroom; |
| size_t heldBackAgentCount = 0; |
| |
| // We randomize the agents here to "spread out" the effect of the first |
| // stage, which tends to allocate from the front of the agent list more |
| // so than the back. |
| std::random_shuffle(slaveIds.begin(), slaveIds.end()); |
| |
| foreach (const SlaveID& slaveId, slaveIds) { |
| Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); |
| |
| foreach (const string& role, roleSorter->sort()) { |
| // TODO(bmahler): Handle shared volumes, which are always available but |
| // should be excluded here based on `offeredSharedResources`. |
| if (slave.getAvailable().empty()) { |
| break; // Nothing left on this agent. |
| } |
| |
| const ResourceLimits& quotaLimits = getQuota(role).limits; |
| |
| // NOTE: Suppressed frameworks are not included in the sort. |
| Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); |
| |
| foreach (const string& frameworkId_, frameworkSorter->sort()) { |
| // Offer a shared resource only if it has not been offered in this |
| // offer cycle to a framework. |
| Resources available = |
| slave.getAvailable().allocatableTo(role) - |
| offeredSharedResources.get(slaveId).getOrElse(Resources()); |
| |
| if (available.empty()) { |
| break; // Nothing left for the role. |
| } |
| |
| FrameworkID frameworkId; |
| frameworkId.set_value(frameworkId_); |
| |
| const Framework& framework = *CHECK_NOTNONE(getFramework(frameworkId)); |
| |
| // An early `continue` optimization. |
| if (!allocatable(available, role, framework)) { |
| continue; |
| } |
| |
| if (!isCapableOfReceivingAgent(framework.capabilities, slave)) { |
| continue; |
| } |
| |
| available = stripIncapableResources(available, framework.capabilities); |
| |
| // Reservations (including the roles ancestors' reservations), |
| // non-scalar resources and revocable resources are always allocated. |
| Resources toOffer = available.filter([&](const Resource& resource) { |
| return Resources::isReserved(resource) || |
| resource.type() != Value::SCALAR || |
| Resources::isRevocable(resource); |
| }); |
| |
| // Then, unreserved scalar resources are subject to quota limits |
| // and global headroom enforcement. |
| // |
| // This is hot path, we use explicit filter calls to avoid |
| // multiple traversal. |
| Resources additionalScalarOffering = |
| available.filter([&](const Resource& resource) { |
| return resource.type() == Value::SCALAR && |
| Resources::isUnreserved(resource) && |
| !Resources::isRevocable(resource); |
| }); |
| |
| // Limits enforcement. |
| if (!quotaLimits.empty()) { |
| additionalScalarOffering = shrinkResources( |
| additionalScalarOffering, |
| quotaLimits - CHECK_NOTNONE(rolesConsumedQuota.get(role))); |
| } |
| |
| // Headroom enforcement. |
| // |
| // This check is only for performance optimization. |
| if (!requiredHeadroom.empty() && !additionalScalarOffering.empty()) { |
| Resources shrunk = shrinkResources( |
| additionalScalarOffering, availableHeadroom - requiredHeadroom); |
| |
| // If resources are held back. |
| if (shrunk != additionalScalarOffering) { |
| heldBackForHeadroom += ResourceQuantities::fromScalarResources( |
| additionalScalarOffering - shrunk); |
| ++heldBackAgentCount; |
| |
| additionalScalarOffering = std::move(shrunk); |
| } |
| } |
| |
| toOffer += additionalScalarOffering; |
| |
| // If the framework filters these resources, ignore. |
| if (!allocatable(toOffer, role, framework) || |
| isFiltered(framework, role, slave, toOffer)) { |
| continue; |
| } |
| |
| VLOG(2) << "Offering " << toOffer << " on agent " << slaveId |
| << " to role " << role << " of framework " << frameworkId; |
| |
| toOffer.allocate(role); |
| |
| offerable[frameworkId][role][slaveId] += toOffer; |
| offeredSharedResources[slaveId] += toOffer.shared(); |
| |
| // Update role consumed quota and quota headroom |
| |
| ResourceQuantities increasedQuotaConsumption = |
| ResourceQuantities::fromScalarResources(additionalScalarOffering); |
| |
| if (getQuota(role) != DEFAULT_QUOTA) { |
| rolesConsumedQuota[role] += increasedQuotaConsumption; |
| for (const string& ancestor : roles::ancestors(role)) { |
| rolesConsumedQuota[ancestor] += increasedQuotaConsumption; |
| } |
| } |
| |
| availableHeadroom -= increasedQuotaConsumption; |
| |
| slave.decreaseAvailable(frameworkId, toOffer); |
| |
| trackAllocatedResources(slaveId, frameworkId, toOffer); |
| } |
| } |
| } |
| |
| if (logHeadroomInfo) { |
| LOG(INFO) << "After allocation, " << requiredHeadroom |
| << " are required for quota headroom, " |
| << heldBackForHeadroom << " were held back from " |
| << heldBackAgentCount |
| << " agents to ensure sufficient quota headroom"; |
| } |
| |
| if (offerable.empty()) { |
| VLOG(2) << "No allocations performed"; |
| } else { |
| // Now offer the resources to each framework. |
| foreachkey (const FrameworkID& frameworkId, offerable) { |
| offerCallback(frameworkId, offerable.at(frameworkId)); |
| } |
| } |
| } |
| |
| |
| void HierarchicalAllocatorProcess::generateInverseOffers() |
| { |
| // In this case, `offerable` is actually the slaves and/or resources that we |
| // want the master to create `InverseOffer`s from. |
| hashmap<FrameworkID, hashmap<SlaveID, UnavailableResources>> offerable; |
| |
| // For maintenance, we only send inverse offers to frameworks that have the |
| // potential to lose something (i.e. it has resources offered or allocated on |
| // a given agent). We keep track of which frameworks already have an |
| // outstanding inverse offer for the given slave in the UnavailabilityStatus |
| // of the specific slave using the `offerOutstanding` flag. This is equivalent |
| // to the accounting we do for resources when we send regular offers. If we |
| // didn't keep track of outstanding offers then we would keep generating new |
| // inverse offers even though the framework had not responded yet. |
| // |
| // TODO(mzhu): Need to consider reservations as well. |
| foreach (const SlaveID& slaveId, allocationCandidates) { |
| Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); |
| |
| if (slave.maintenance.isSome()) { |
| // We use a reference by alias because we intend to modify the |
| // `maintenance` and to improve readability. |
| Slave::Maintenance& maintenance = slave.maintenance.get(); |
| |
| foreachkey ( |
| const FrameworkID& frameworkId, slave.getOfferedOrAllocated()) { |
| const Option<Framework*> framework_ = getFramework(frameworkId); |
| // NOTE: This method might be called in-between adding per-framework |
| // used resources on an agent via `addSlave()` and adding a framework |
| // via `addFramework()`. |
| if (framework_.isNone()) { |
| // Framework is using resources on an agent but has not yet been |
| // re-added to allocator via addFramework(). |
| continue; |
| } |
| |
| const Framework& framework = **framework_; |
| |
| // No need to deallocate for an inactive framework as the master |
| // will not send it inverse offers. |
| if (!framework.active) { |
| continue; |
| } |
| |
| // If this framework doesn't already have inverse offers for the |
| // specified slave. |
| if (!offerable[frameworkId].contains(slaveId)) { |
| // If there isn't already an outstanding inverse offer to this |
| // framework for the specified slave. |
| if (!maintenance.offersOutstanding.contains(frameworkId)) { |
| // Ignore in case the framework filters inverse offers for this |
| // slave. |
| // |
| // NOTE: Since this specific allocator implementation only sends |
| // inverse offers for maintenance primitives, and those are at the |
| // whole slave level, we only need to filter based on the |
| // time-out. |
| if (isFiltered(framework, slave)) { |
| continue; |
| } |
| |
| const UnavailableResources unavailableResources = |
| UnavailableResources{Resources(), maintenance.unavailability}; |
| |
| // For now we send inverse offers with empty resources when the |
| // inverse offer represents maintenance on the machine. In the |
| // future we could be more specific about the resources on the |
| // host, as we have the information available. |
| offerable[frameworkId][slaveId] = unavailableResources; |
| |
| // Mark this framework as having an offer outstanding for the |
| // specified slave. |
| maintenance.offersOutstanding.insert(frameworkId); |
| } |
| } |
| } |
| } |
| } |
| |
| if (offerable.empty()) { |
| VLOG(2) << "No inverse offers to send out!"; |
| } else { |
| // Now send inverse offers to each framework. |
| foreachkey (const FrameworkID& frameworkId, offerable) { |
| inverseOfferCallback(frameworkId, offerable[frameworkId]); |
| } |
| } |
| } |
| |
| |
| void HierarchicalAllocatorProcess::_expire( |
| const FrameworkID& frameworkId, |
| const string& role, |
| const SlaveID& slaveId, |
| const weak_ptr<OfferFilter>& offerFilter) |
| { |
| // The filter might have already been removed (e.g., if the |
| // framework no longer exists or in `reviveOffers()`) but |
| // we may land here if the cancelation of the expiry timeout |
| // did not succeed (due to the dispatch already being in the |
| // queue). |
| shared_ptr<OfferFilter> filter = offerFilter.lock(); |
| |
| if (filter.get() == nullptr) { |
| return; |
| } |
| |
| // Since this is a performance-sensitive piece of code, |
| // we use find to avoid the doing any redundant lookups. |
| auto frameworkIterator = frameworks.find(frameworkId); |
| CHECK(frameworkIterator != frameworks.end()); |
| |
| Framework& framework = frameworkIterator->second; |
| |
| auto roleFilters = framework.offerFilters.find(role); |
| CHECK(roleFilters != framework.offerFilters.end()); |
| |
| auto agentFilters = roleFilters->second.find(slaveId); |
| CHECK(agentFilters != roleFilters->second.end()); |
| |
| // Erase the filter (may be a no-op per the comment above). |
| agentFilters->second.erase(filter); |
| if (agentFilters->second.empty()) { |
| roleFilters->second.erase(slaveId); |
| } |
| if (roleFilters->second.empty()) { |
| framework.offerFilters.erase(role); |
| } |
| } |
| |
| |
| void HierarchicalAllocatorProcess::expire( |
| const FrameworkID& frameworkId, |
| const string& role, |
| const SlaveID& slaveId, |
| const weak_ptr<OfferFilter>& offerFilter) |
| { |
| dispatch( |
| self(), |
| &Self::_expire, |
| frameworkId, |
| role, |
| slaveId, |
| offerFilter); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::expire( |
| const FrameworkID& frameworkId, |
| const SlaveID& slaveId, |
| const weak_ptr<InverseOfferFilter>& inverseOfferFilter) |
| { |
| // The filter might have already been removed (e.g., if the |
| // framework no longer exists or in |
| // HierarchicalAllocatorProcess::reviveOffers) but |
| // we may land here if the cancelation of the expiry timeout |
| // did not succeed (due to the dispatch already being in the |
| // queue). |
| shared_ptr<InverseOfferFilter> filter = inverseOfferFilter.lock(); |
| |
| if (filter.get() == nullptr) { |
| return; |
| } |
| |
| // Since this is a performance-sensitive piece of code, |
| // we use find to avoid the doing any redundant lookups. |
| auto frameworkIterator = frameworks.find(frameworkId); |
| CHECK(frameworkIterator != frameworks.end()); |
| |
| Framework& framework = frameworkIterator->second; |
| |
| auto filters = framework.inverseOfferFilters.find(slaveId); |
| CHECK(filters != framework.inverseOfferFilters.end()); |
| |
| filters->second.erase(filter); |
| if (filters->second.empty()) { |
| framework.inverseOfferFilters.erase(slaveId); |
| } |
| } |
| |
| |
| bool HierarchicalAllocatorProcess::isWhitelisted( |
| const SlaveID& slaveId) const |
| { |
| return whitelist.isNone() || |
| whitelist->contains(CHECK_NOTNONE(getSlave(slaveId))->info.hostname()); |
| } |
| |
| |
| bool HierarchicalAllocatorProcess::isFiltered( |
| const Framework& framework, |
| const string& role, |
| const Slave& slave, |
| const Resources& resources) const |
| { |
| // TODO(mpark): Consider moving these filter logic out and into the master, |
| // since they are not specific to the hierarchical allocator but rather are |
| // global allocation constraints. |
| |
| // Prevent offers from non-MULTI_ROLE agents to be allocated |
| // to MULTI_ROLE frameworks. |
| if (framework.capabilities.multiRole && |
| !slave.capabilities.multiRole) { |
| LOG(WARNING) << "Implicitly filtering agent " << slave.info.id() |
| << " from framework " << framework.frameworkId |
| << " because the framework is MULTI_ROLE capable" |
| << " but the agent is not"; |
| |
| return true; |
| } |
| |
| // Prevent offers from non-HIERARCHICAL_ROLE agents to be allocated |
| // to hierarchical roles. |
| if (!slave.capabilities.hierarchicalRole && strings::contains(role, "/")) { |
| LOG(WARNING) << "Implicitly filtering agent " << slave.info.id() |
| << " from role " << role |
| << " because the role is hierarchical but the agent is not" |
| << " HIERARCHICAL_ROLE capable"; |
| |
| return true; |
| } |
| |
| // Since this is a performance-sensitive piece of code, |
| // we use find to avoid the doing any redundant lookups. |
| auto roleFilters = framework.offerFilters.find(role); |
| if (roleFilters == framework.offerFilters.end()) { |
| return false; |
| } |
| |
| auto agentFilters = roleFilters->second.find(slave.info.id()); |
| if (agentFilters == roleFilters->second.end()) { |
| return false; |
| } |
| |
| foreach (const shared_ptr<OfferFilter>& offerFilter, agentFilters->second) { |
| if (offerFilter->filter(resources)) { |
| VLOG(1) << "Filtered offer with " << resources |
| << " on agent " << slave.info.id() |
| << " for role " << role |
| << " of framework " << framework.frameworkId; |
| |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| |
| bool HierarchicalAllocatorProcess::isFiltered( |
| const Framework& framework, const Slave& slave) const |
| { |
| if (framework.inverseOfferFilters.contains(slave.info.id())) { |
| foreach (const shared_ptr<InverseOfferFilter>& inverseOfferFilter, |
| framework.inverseOfferFilters.at(slave.info.id())) { |
| if (inverseOfferFilter->filter()) { |
| VLOG(1) << "Filtered unavailability on agent " << slave.info.id() |
| << " for framework " << framework.frameworkId; |
| |
| return true; |
| } |
| } |
| } |
| |
| return false; |
| } |
| |
| |
| bool HierarchicalAllocatorProcess::allocatable( |
| const Resources& resources, |
| const string& role, |
| const Framework& framework) const |
| { |
| if (resources.empty()) { |
| return false; |
| } |
| |
| // By default we check against the globally configured minimal |
| // allocatable resources. |
| // |
| // NOTE: We use a pointer instead of `Option` semantics here to |
| // avoid copying vectors in code in the hot path of the allocator. |
| const vector<ResourceQuantities>* _minAllocatableResources = |
| options.minAllocatableResources.isSome() |
| ? &options.minAllocatableResources.get() |
| : nullptr; |
| |
| if (framework.minAllocatableResources.contains(role)) { |
| _minAllocatableResources = &framework.minAllocatableResources.at(role); |
| } |
| |
| // If no minimal requirements or an empty set of requirments are |
| // configured any resource is allocatable. |
| if (_minAllocatableResources == nullptr || |
| _minAllocatableResources->empty()) { |
| return true; |
| } |
| |
| return std::any_of( |
| _minAllocatableResources->begin(), |
| _minAllocatableResources->end(), |
| [&](const ResourceQuantities& qs) { return resources.contains(qs); }); |
| } |
| |
| |
| double HierarchicalAllocatorProcess::_resources_offered_or_allocated( |
| const string& resource) |
| { |
| double offered_or_allocated = 0; |
| |
| foreachvalue (const Slave& slave, slaves) { |
| Option<Value::Scalar> value = |
| slave.getTotalOfferedOrAllocated().get<Value::Scalar>(resource); |
| |
| if (value.isSome()) { |
| offered_or_allocated += value->value(); |
| } |
| } |
| |
| return offered_or_allocated; |
| } |
| |
| |
| double HierarchicalAllocatorProcess::_resources_total( |
| const string& resource) |
| { |
| return totalScalarQuantities.get(resource).value(); |
| } |
| |
| |
| double HierarchicalAllocatorProcess::_quota_offered_or_allocated( |
| const string& role, |
| const string& resource) |
| { |
| if (!roleSorter->contains(role)) { |
| // This can occur when execution of this callback races with removal of the |
| // metric for a role which does not have any associated frameworks. |
| return 0.; |
| } |
| |
| return roleSorter->allocationScalarQuantities(role).get(resource).value(); |
| } |
| |
| |
| double HierarchicalAllocatorProcess::_offer_filters_active( |
| const string& role) |
| { |
| double result = 0; |
| |
| foreachvalue (const Framework& framework, frameworks) { |
| if (!framework.offerFilters.contains(role)) { |
| continue; |
| } |
| |
| foreachkey (const SlaveID& slaveId, framework.offerFilters.at(role)) { |
| result += framework.offerFilters.at(role).at(slaveId).size(); |
| } |
| } |
| |
| return result; |
| } |
| |
| |
| bool HierarchicalAllocatorProcess::isFrameworkTrackedUnderRole( |
| const FrameworkID& frameworkId, const string& role) const |
| { |
| Option<const Role*> r = roleTree.get(role); |
| return r.isSome() && (*r)->frameworks().contains(frameworkId); |
| } |
| |
| |
| Option<Slave*> HierarchicalAllocatorProcess::getSlave( |
| const SlaveID& slaveId) const |
| { |
| auto it = slaves.find(slaveId); |
| |
| if (it == slaves.end()) return None(); |
| |
| return const_cast<Slave*>(&it->second); |
| } |
| |
| |
| Option<Framework*> HierarchicalAllocatorProcess::getFramework( |
| const FrameworkID& frameworkId) const |
| { |
| auto it = frameworks.find(frameworkId); |
| |
| if (it == frameworks.end()) return None(); |
| |
| return const_cast<Framework*>(&it->second); |
| } |
| |
| |
| Option<Sorter*> HierarchicalAllocatorProcess::getFrameworkSorter( |
| const string& role) const |
| { |
| auto it = frameworkSorters.find(role); |
| |
| if (it == frameworkSorters.end()) return None(); |
| |
| return const_cast<Sorter*>(it->second.get()); |
| } |
| |
| |
| const Quota& HierarchicalAllocatorProcess::getQuota(const string& role) const |
| { |
| Option<const Role*> r = roleTree.get(role); |
| |
| return r.isSome() ? (*r)->quota() : DEFAULT_QUOTA; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::trackFrameworkUnderRole( |
| const Framework& framework, const string& role) |
| { |
| CHECK(initialized); |
| |
| // If this is the first framework to subscribe to this role, |
| // initialize state as necessary. |
| if (roleTree.get(role).isNone() || |
| (*roleTree.get(role))->frameworks().empty()) { |
| CHECK_NOT_CONTAINS(*roleSorter, role); |
| roleSorter->add(role); |
| roleSorter->activate(role); |
| |
| CHECK_NOT_CONTAINS(frameworkSorters, role); |
| frameworkSorters.insert({role, Owned<Sorter>(frameworkSorterFactory())}); |
| |
| Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); |
| |
| frameworkSorter->initialize(options.fairnessExcludeResourceNames); |
| |
| foreachvalue (const Slave& slave, slaves) { |
| frameworkSorter->addSlave( |
| slave.info.id(), |
| ResourceQuantities::fromScalarResources(slave.getTotal().scalars())); |
| } |
| } |
| |
| roleTree.trackFramework(framework.frameworkId, role); |
| |
| Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); |
| |
| CHECK_NOT_CONTAINS(*frameworkSorter, framework.frameworkId.value()) |
| << " for role " << role; |
| frameworkSorter->add(framework.frameworkId.value()); |
| } |
| |
| |
| bool HierarchicalAllocatorProcess::tryUntrackFrameworkUnderRole( |
| const Framework& framework, const string& role) |
| { |
| CHECK(initialized); |
| |
| Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); |
| CHECK_CONTAINS(*frameworkSorter, framework.frameworkId.value()) |
| << " for role " << role; |
| |
| if (!frameworkSorter->allocation(framework.frameworkId.value()).empty()) { |
| return false; |
| } |
| |
| roleTree.untrackFramework(framework.frameworkId, role); |
| |
| frameworkSorter->remove(framework.frameworkId.value()); |
| |
| if (roleTree.get(role).isNone() || |
| (*roleTree.get(role))->frameworks().empty()) { |
| CHECK_EQ(frameworkSorter->count(), 0u); |
| roleSorter->remove(role); |
| frameworkSorters.erase(role); |
| } |
| |
| return true; |
| } |
| |
| |
| bool HierarchicalAllocatorProcess::updateSlaveTotal( |
| const SlaveID& slaveId, |
| const Resources& total) |
| { |
| Slave& slave = *CHECK_NOTNONE(getSlave(slaveId)); |
| |
| const Resources oldTotal = slave.getTotal(); |
| |
| if (oldTotal == total) { |
| return false; |
| } |
| |
| slave.updateTotal(total); |
| |
| roleTree.untrackReservations(oldTotal.reserved()); |
| roleTree.trackReservations(total.reserved()); |
| |
| // Update the total in the allocator and totals in the sorters. |
| const ResourceQuantities oldAgentScalarQuantities = |
| ResourceQuantities::fromScalarResources(oldTotal.scalars()); |
| |
| const ResourceQuantities agentScalarQuantities = |
| ResourceQuantities::fromScalarResources(total.scalars()); |
| |
| CHECK_CONTAINS(totalScalarQuantities, oldAgentScalarQuantities); |
| totalScalarQuantities -= oldAgentScalarQuantities; |
| totalScalarQuantities += agentScalarQuantities; |
| |
| roleSorter->removeSlave(slaveId); |
| roleSorter->addSlave(slaveId, agentScalarQuantities); |
| |
| foreachvalue (const Owned<Sorter>& sorter, frameworkSorters) { |
| sorter->removeSlave(slaveId); |
| sorter->addSlave(slaveId, agentScalarQuantities); |
| } |
| |
| return true; |
| } |
| |
| |
| bool HierarchicalAllocatorProcess::isRemoteSlave(const Slave& slave) const |
| { |
| // If the slave does not have a configured domain, assume it is not remote. |
| if (!slave.info.has_domain()) { |
| return false; |
| } |
| |
| // The current version of the Mesos agent refuses to startup if a |
| // domain is specified without also including a fault domain. That |
| // might change in the future, if more types of domains are added. |
| // For forward compatibility, we treat agents with a configured |
| // domain but no fault domain as having no configured domain. |
| if (!slave.info.domain().has_fault_domain()) { |
| return false; |
| } |
| |
| // If the slave has a configured domain (and it has been allowed to |
| // register with the master), the master must also have a configured |
| // domain. |
| CHECK(options.domain.isSome()); |
| |
| // The master will not startup if configured with a domain but no |
| // fault domain. |
| CHECK(options.domain->has_fault_domain()); |
| |
| const DomainInfo::FaultDomain::RegionInfo& masterRegion = |
| options.domain->fault_domain().region(); |
| const DomainInfo::FaultDomain::RegionInfo& slaveRegion = |
| slave.info.domain().fault_domain().region(); |
| |
| return masterRegion != slaveRegion; |
| } |
| |
| |
| bool HierarchicalAllocatorProcess::isCapableOfReceivingAgent( |
| const protobuf::framework::Capabilities& frameworkCapabilities, |
| const Slave& slave) const |
| { |
| // Only offer resources from slaves that have GPUs to |
| // frameworks that are capable of receiving GPUs. |
| // See MESOS-5634. |
| if (options.filterGpuResources && !frameworkCapabilities.gpuResources && |
| slave.hasGpu()) { |
| return false; |
| } |
| |
| // If this framework is not region-aware, don't offer it |
| // resources on agents in remote regions. |
| if (!frameworkCapabilities.regionAware && isRemoteSlave(slave)) { |
| return false; |
| } |
| |
| return true; |
| } |
| |
| |
| Resources HierarchicalAllocatorProcess::stripIncapableResources( |
| const Resources& resources, |
| const protobuf::framework::Capabilities& frameworkCapabilities) const |
| { |
| return resources.filter([&](const Resource& resource) { |
| if (!frameworkCapabilities.sharedResources && |
| Resources::isShared(resource)) { |
| return false; |
| } |
| |
| if (!frameworkCapabilities.revocableResources && |
| Resources::isRevocable(resource)) { |
| return false; |
| } |
| |
| // When reservation refinements are present, old frameworks without the |
| // RESERVATION_REFINEMENT capability won't be able to understand the |
| // new format. While it's possible to translate the refined reservations |
| // into the old format by "hiding" the intermediate reservations in the |
| // "stack", this leads to ambiguity when processing RESERVE / UNRESERVE |
| // operations. This is due to the loss of information when we drop the |
| // intermediate reservations. Therefore, for now we simply filter out |
| // resources with refined reservations if the framework does not have |
| // the capability. |
| if (!frameworkCapabilities.reservationRefinement && |
| Resources::hasRefinedReservations(resource)) { |
| return false; |
| } |
| |
| return true; |
| }); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::trackAllocatedResources( |
| const SlaveID& slaveId, |
| const FrameworkID& frameworkId, |
| const Resources& allocated) |
| { |
| CHECK_CONTAINS(slaves, slaveId); |
| CHECK_CONTAINS(frameworks, frameworkId); |
| |
| // TODO(bmahler): Calling allocations() is expensive since it has |
| // to construct a map. Avoid this. |
| foreachpair (const string& role, |
| const Resources& allocation, |
| allocated.allocations()) { |
| // The framework has resources allocated to this role but it may |
| // or may not be subscribed to the role. Either way, we need to |
| // track the framework under the role. |
| if (!isFrameworkTrackedUnderRole(frameworkId, role)) { |
| trackFrameworkUnderRole(*CHECK_NOTNONE(getFramework(frameworkId)), role); |
| } |
| |
| CHECK_CONTAINS(*roleSorter, role); |
| |
| Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); |
| |
| CHECK_CONTAINS(*frameworkSorter, frameworkId.value()) |
| << " for role " << role; |
| |
| roleTree.trackOfferedOrAllocated(slaveId, allocation); |
| |
| roleSorter->allocated(role, slaveId, allocation); |
| frameworkSorter->allocated( |
| frameworkId.value(), slaveId, allocation); |
| } |
| } |
| |
| |
| void HierarchicalAllocatorProcess::untrackAllocatedResources( |
| const SlaveID& slaveId, |
| const FrameworkID& frameworkId, |
| const Resources& allocated) |
| { |
| // TODO(mzhu): Add a `CHECK(slaves.contains(slaveId));` |
| // here once MESOS-621 is resolved. Ideally, `removeSlave()` |
| // should unallocate resources in the framework sorters. |
| // But currently, a slave is removed first via `removeSlave()` |
| // and later a call to `recoverResources()` occurs to recover |
| // the framework's resources. |
| Framework* framework = CHECK_NOTNONE(getFramework(frameworkId)); |
| |
| // TODO(bmahler): Calling allocations() is expensive since it has |
| // to construct a map. Avoid this. |
| foreachpair (const string& role, |
| const Resources& allocation, |
| allocated.allocations()) { |
| CHECK_CONTAINS(*roleSorter, role); |
| |
| Sorter* frameworkSorter = CHECK_NOTNONE(getFrameworkSorter(role)); |
| |
| CHECK_CONTAINS(*frameworkSorter, frameworkId.value()) |
| << "for role " << role; |
| |
| roleTree.untrackOfferedOrAllocated(slaveId, allocation); |
| |
| frameworkSorter->unallocated(frameworkId.value(), slaveId, allocation); |
| |
| roleSorter->unallocated(role, slaveId, allocation); |
| |
| // If the framework is no longer subscribed to the role, we can try to |
| // untrack the framework under the role. |
| if (framework->roles.count(role) == 0) { |
| tryUntrackFrameworkUnderRole(*framework, role); |
| } |
| } |
| } |
| |
| |
| } // namespace internal { |
| } // namespace allocator { |
| } // namespace master { |
| } // namespace internal { |
| } // namespace mesos { |