| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include "master/allocator/mesos/hierarchical.hpp" |
| |
| #include <algorithm> |
| #include <set> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include <mesos/attributes.hpp> |
| #include <mesos/resources.hpp> |
| #include <mesos/type_utils.hpp> |
| |
| #include <process/after.hpp> |
| #include <process/delay.hpp> |
| #include <process/dispatch.hpp> |
| #include <process/event.hpp> |
| #include <process/id.hpp> |
| #include <process/loop.hpp> |
| #include <process/timeout.hpp> |
| |
| #include <stout/check.hpp> |
| #include <stout/hashset.hpp> |
| #include <stout/set.hpp> |
| #include <stout/stopwatch.hpp> |
| #include <stout/stringify.hpp> |
| |
| #include "common/protobuf_utils.hpp" |
| #include "common/resource_quantities.hpp" |
| |
| using std::make_shared; |
| using std::set; |
| using std::shared_ptr; |
| using std::string; |
| using std::vector; |
| using std::weak_ptr; |
| |
| using mesos::allocator::InverseOfferStatus; |
| |
| using process::after; |
| using process::Continue; |
| using process::ControlFlow; |
| using process::Failure; |
| using process::Future; |
| using process::loop; |
| using process::Owned; |
| using process::PID; |
| using process::Timeout; |
| |
| |
| namespace mesos { |
| |
| // Needed to prevent shadowing of template '::operator-<std::set<T>>' |
| // by non-template '::mesos::operator-' |
| using ::operator-; |
| |
| namespace internal { |
| namespace master { |
| namespace allocator { |
| namespace internal { |
| |
| // Used to represent "filters" for resources unused in offers. |
| class OfferFilter |
| { |
| public: |
| virtual ~OfferFilter() {} |
| |
| virtual bool filter(const Resources& resources) const = 0; |
| }; |
| |
| |
| class RefusedOfferFilter : public OfferFilter |
| { |
| public: |
| RefusedOfferFilter( |
| const Resources& _resources, |
| const Duration& timeout) |
| : _resources(_resources), |
| _expired(after(timeout)) {} |
| |
| virtual ~RefusedOfferFilter() |
| { |
| // Cancel the timeout upon destruction to avoid lingering timers. |
| _expired.discard(); |
| } |
| |
| Future<Nothing> expired() const { return _expired; }; |
| |
| bool filter(const Resources& resources) const override |
| { |
| // NOTE: We do not check for the filter being expired here |
| // because `recoverResources()` expects the filter to apply |
| // until the filter is removed, see: |
| // https://github.com/apache/mesos/commit/2f170f302fe94c4 |
| // |
| // TODO(jieyu): Consider separating the superset check for regular |
| // and revocable resources. For example, frameworks might want |
| // more revocable resources only or non-revocable resources only, |
| // but currently the filter only expires if there is more of both |
| // revocable and non-revocable resources. |
| return _resources.contains(resources); // Refused resources are superset. |
| } |
| |
| private: |
| const Resources _resources; |
| Future<Nothing> _expired; |
| }; |
| |
| |
| // Used to represent "filters" for inverse offers. |
| // |
| // NOTE: Since this specific allocator implementation only sends inverse offers |
| // for maintenance primitives, and those are at the whole slave level, we only |
| // need to filter based on the time-out. |
| // If this allocator implementation starts sending out more resource specific |
| // inverse offers, then we can capture the `unavailableResources` in the filter |
| // function. |
| class InverseOfferFilter |
| { |
| public: |
| virtual ~InverseOfferFilter() {} |
| |
| virtual bool filter() const = 0; |
| }; |
| |
| |
| // NOTE: See comment above `InverseOfferFilter` regarding capturing |
| // `unavailableResources` if this allocator starts sending fine-grained inverse |
| // offers. |
| class RefusedInverseOfferFilter : public InverseOfferFilter |
| { |
| public: |
| RefusedInverseOfferFilter(const Duration& timeout) |
| : _expired(after(timeout)) {} |
| |
| virtual ~RefusedInverseOfferFilter() |
| { |
| // Cancel the timeout upon destruction to avoid lingering timers. |
| _expired.discard(); |
| } |
| |
| Future<Nothing> expired() const { return _expired; }; |
| |
| bool filter() const override |
| { |
| // See comment above why we currently don't do more fine-grained filtering. |
| return _expired.isPending(); |
| } |
| |
| private: |
| Future<Nothing> _expired; |
| }; |
| |
| |
| HierarchicalAllocatorProcess::Framework::Framework( |
| const FrameworkInfo& frameworkInfo, |
| const set<string>& _suppressedRoles, |
| bool _active) |
| : roles(protobuf::framework::getRoles(frameworkInfo)), |
| suppressedRoles(_suppressedRoles), |
| capabilities(frameworkInfo.capabilities()), |
| active(_active), |
| metrics(new FrameworkMetrics(frameworkInfo)) {} |
| |
| |
| void HierarchicalAllocatorProcess::initialize( |
| const Duration& _allocationInterval, |
| const lambda::function< |
| void(const FrameworkID&, |
| const hashmap<string, hashmap<SlaveID, Resources>>&)>& |
| _offerCallback, |
| const lambda::function< |
| void(const FrameworkID&, |
| const hashmap<SlaveID, UnavailableResources>&)>& |
| _inverseOfferCallback, |
| const Option<set<string>>& _fairnessExcludeResourceNames, |
| bool _filterGpuResources, |
| const Option<DomainInfo>& _domain, |
| const Option<std::vector<mesos::internal::ResourceQuantities>>& |
| _minAllocatableResources, |
| const size_t maxCompletedFrameworks) |
| { |
| allocationInterval = _allocationInterval; |
| offerCallback = _offerCallback; |
| inverseOfferCallback = _inverseOfferCallback; |
| fairnessExcludeResourceNames = _fairnessExcludeResourceNames; |
| filterGpuResources = _filterGpuResources; |
| domain = _domain; |
| minAllocatableResources = _minAllocatableResources; |
| initialized = true; |
| paused = false; |
| |
| completedFrameworkMetrics = |
| BoundedHashMap<FrameworkID, process::Owned<FrameworkMetrics>>( |
| maxCompletedFrameworks); |
| |
| // Resources for quota'ed roles are allocated separately and prior to |
| // non-quota'ed roles, hence a dedicated sorter for quota'ed roles is |
| // necessary. |
| roleSorter->initialize(fairnessExcludeResourceNames); |
| quotaRoleSorter->initialize(fairnessExcludeResourceNames); |
| |
| VLOG(1) << "Initialized hierarchical allocator process"; |
| |
| // Start a loop to run allocation periodically. |
| PID<HierarchicalAllocatorProcess> _self = self(); |
| |
| loop( |
| None(), // Use `None` so we iterate outside the allocator process. |
| [_allocationInterval]() { |
| return after(_allocationInterval); |
| }, |
| [_self](const Nothing&) { |
| return dispatch(_self, &HierarchicalAllocatorProcess::allocate) |
| .then([]() -> ControlFlow<Nothing> { return Continue(); }); |
| }); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::recover( |
| const int _expectedAgentCount, |
| const hashmap<string, Quota>& quotas) |
| { |
| // Recovery should start before actual allocation starts. |
| CHECK(initialized); |
| CHECK(slaves.empty()); |
| CHECK_EQ(0u, quotaRoleSorter->count()); |
| CHECK(_expectedAgentCount >= 0); |
| |
| // If there is no quota, recovery is a no-op. Otherwise, we need |
| // to delay allocations while agents are reregistering because |
| // otherwise we perform allocations on a partial view of resources! |
| // We would consequently perform unnecessary allocations to satisfy |
| // quota constraints, which can over-allocate non-revocable resources |
| // to roles using quota. Then, frameworks in roles without quota can |
| // be unnecessarily deprived of resources. We may also be unable to |
| // satisfy all of the quota constraints. Repeated master failovers |
| // exacerbate the issue. |
| |
| if (quotas.empty()) { |
| VLOG(1) << "Skipping recovery of hierarchical allocator:" |
| << " nothing to recover"; |
| |
| return; |
| } |
| |
| // NOTE: `quotaRoleSorter` is updated implicitly in `setQuota()`. |
| foreachpair (const string& role, const Quota& quota, quotas) { |
| setQuota(role, quota); |
| } |
| |
| // TODO(alexr): Consider exposing these constants. |
| const Duration ALLOCATION_HOLD_OFF_RECOVERY_TIMEOUT = Minutes(10); |
| const double AGENT_RECOVERY_FACTOR = 0.8; |
| |
| // Record the number of expected agents. |
| expectedAgentCount = |
| static_cast<int>(_expectedAgentCount * AGENT_RECOVERY_FACTOR); |
| |
| // Skip recovery if there are no expected agents. This is not strictly |
| // necessary for the allocator to function correctly, but maps better |
| // to expected behavior by the user: the allocator is not paused until |
| // a new agent is added. |
| if (expectedAgentCount.get() == 0) { |
| VLOG(1) << "Skipping recovery of hierarchical allocator:" |
| << " no reconnecting agents to wait for"; |
| |
| return; |
| } |
| |
| // Pause allocation until after a sufficient amount of agents reregister |
| // or a timer expires. |
| pause(); |
| |
| // Setup recovery timer. |
| delay(ALLOCATION_HOLD_OFF_RECOVERY_TIMEOUT, self(), &Self::resume); |
| |
| LOG(INFO) << "Triggered allocator recovery: waiting for " |
| << expectedAgentCount.get() << " agents to reconnect or " |
| << ALLOCATION_HOLD_OFF_RECOVERY_TIMEOUT << " to pass"; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::addFramework( |
| const FrameworkID& frameworkId, |
| const FrameworkInfo& frameworkInfo, |
| const hashmap<SlaveID, Resources>& used, |
| bool active, |
| const set<string>& suppressedRoles) |
| { |
| CHECK(initialized); |
| CHECK(!frameworks.contains(frameworkId)); |
| |
| frameworks.insert( |
| {frameworkId, Framework(frameworkInfo, suppressedRoles, active)}); |
| |
| const Framework& framework = frameworks.at(frameworkId); |
| |
| foreach (const string& role, framework.roles) { |
| trackFrameworkUnderRole(frameworkId, role); |
| |
| CHECK(frameworkSorters.contains(role)); |
| |
| if (suppressedRoles.count(role)) { |
| frameworkSorters.at(role)->deactivate(frameworkId.value()); |
| framework.metrics->suppressRole(role); |
| } else { |
| frameworkSorters.at(role)->activate(frameworkId.value()); |
| framework.metrics->reviveRole(role); |
| } |
| } |
| |
| // Update the allocation for this framework. |
| foreachpair (const SlaveID& slaveId, const Resources& resources, used) { |
| // TODO(bmahler): The master won't tell us about resources |
| // allocated to agents that have not yet been added, consider |
| // CHECKing this case. |
| if (!slaves.contains(slaveId)) { |
| continue; |
| } |
| |
| // The slave struct will already be aware of the allocated |
| // resources, so we only need to track them in the sorters. |
| trackAllocatedResources(slaveId, frameworkId, resources); |
| } |
| |
| LOG(INFO) << "Added framework " << frameworkId; |
| |
| if (active) { |
| allocate(); |
| } else { |
| deactivateFramework(frameworkId); |
| } |
| } |
| |
| |
| void HierarchicalAllocatorProcess::removeFramework( |
| const FrameworkID& frameworkId) |
| { |
| CHECK(initialized); |
| CHECK(frameworks.contains(frameworkId)) << frameworkId; |
| |
| Framework& framework = frameworks.at(frameworkId); |
| |
| foreach (const string& role, framework.roles) { |
| // Might not be in 'frameworkSorters[role]' because it |
| // was previously deactivated and never re-added. |
| // |
| // TODO(mzhu): This check may no longer be necessary. |
| if (!frameworkSorters.contains(role) || |
| !frameworkSorters.at(role)->contains(frameworkId.value())) { |
| continue; |
| } |
| |
| hashmap<SlaveID, Resources> allocation = |
| frameworkSorters.at(role)->allocation(frameworkId.value()); |
| |
| // Update the allocation for this framework. |
| foreachpair (const SlaveID& slaveId, |
| const Resources& allocated, |
| allocation) { |
| untrackAllocatedResources(slaveId, frameworkId, allocated); |
| } |
| |
| untrackFrameworkUnderRole(frameworkId, role); |
| } |
| |
| // Transfer ownership of this framework's metrics to |
| // `completedFrameworkMetrics`. |
| completedFrameworkMetrics.set( |
| frameworkId, |
| Owned<FrameworkMetrics>(framework.metrics.release())); |
| |
| frameworks.erase(frameworkId); |
| |
| LOG(INFO) << "Removed framework " << frameworkId; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::activateFramework( |
| const FrameworkID& frameworkId) |
| { |
| CHECK(initialized); |
| CHECK(frameworks.contains(frameworkId)); |
| |
| Framework& framework = frameworks.at(frameworkId); |
| |
| framework.active = true; |
| |
| // Activate all roles for this framework except the roles that |
| // are marked as deactivated. |
| // Note: A subset of framework roles can be deactivated if the |
| // role is specified in `suppressed_roles` during framework |
| // (re)registration, or via a subsequent `SUPPRESS` call. |
| foreach (const string& role, framework.roles) { |
| CHECK(frameworkSorters.contains(role)); |
| |
| if (!framework.suppressedRoles.count(role)) { |
| frameworkSorters.at(role)->activate(frameworkId.value()); |
| } |
| } |
| |
| LOG(INFO) << "Activated framework " << frameworkId; |
| |
| allocate(); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::deactivateFramework( |
| const FrameworkID& frameworkId) |
| { |
| CHECK(initialized); |
| CHECK(frameworks.contains(frameworkId)) << frameworkId; |
| |
| Framework& framework = frameworks.at(frameworkId); |
| |
| foreach (const string& role, framework.roles) { |
| CHECK(frameworkSorters.contains(role)); |
| frameworkSorters.at(role)->deactivate(frameworkId.value()); |
| |
| // Note that the Sorter *does not* remove the resources allocated |
| // to this framework. For now, this is important because if the |
| // framework fails over and is activated, we still want a record |
| // of the resources that it is using. We might be able to collapse |
| // the added/removed and activated/deactivated in the future. |
| } |
| |
| framework.active = false; |
| |
| framework.offerFilters.clear(); |
| framework.inverseOfferFilters.clear(); |
| |
| LOG(INFO) << "Deactivated framework " << frameworkId; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::updateFramework( |
| const FrameworkID& frameworkId, |
| const FrameworkInfo& frameworkInfo, |
| const set<string>& suppressedRoles) |
| { |
| CHECK(initialized); |
| CHECK(frameworks.contains(frameworkId)); |
| |
| Framework& framework = frameworks.at(frameworkId); |
| |
| const set<string> oldRoles = framework.roles; |
| const set<string> newRoles = protobuf::framework::getRoles(frameworkInfo); |
| const set<string> oldSuppressedRoles = framework.suppressedRoles; |
| |
| foreach (const string& role, newRoles - oldRoles) { |
| framework.metrics->addSubscribedRole(role); |
| |
| // NOTE: It's possible that we're already tracking this framework |
| // under the role because a framework can unsubscribe from a role |
| // while it still has resources allocated to the role. |
| if (!isFrameworkTrackedUnderRole(frameworkId, role)) { |
| trackFrameworkUnderRole(frameworkId, role); |
| } |
| } |
| |
| foreach (const string& role, oldRoles - newRoles) { |
| CHECK(frameworkSorters.contains(role)); |
| |
| frameworkSorters.at(role)->deactivate(frameworkId.value()); |
| |
| // Stop tracking the framework under this role if there are |
| // no longer any resources allocated to it. |
| if (frameworkSorters.at(role)->allocation(frameworkId.value()).empty()) { |
| untrackFrameworkUnderRole(frameworkId, role); |
| } |
| |
| if (framework.offerFilters.contains(role)) { |
| framework.offerFilters.erase(role); |
| } |
| |
| framework.metrics->removeSubscribedRole(role); |
| framework.suppressedRoles.erase(role); |
| } |
| |
| framework.roles = newRoles; |
| framework.capabilities = frameworkInfo.capabilities(); |
| |
| suppressRoles(frameworkId, suppressedRoles); |
| unsuppressRoles(frameworkId, newRoles - suppressedRoles); |
| |
| CHECK(framework.suppressedRoles == suppressedRoles) |
| << "After updating framework " << frameworkId |
| << " its set of suppressed roles " << stringify(framework.suppressedRoles) |
| << " differs from required " << stringify(suppressedRoles); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::addSlave( |
| const SlaveID& slaveId, |
| const SlaveInfo& slaveInfo, |
| const vector<SlaveInfo::Capability>& capabilities, |
| const Option<Unavailability>& unavailability, |
| const Resources& total, |
| const hashmap<FrameworkID, Resources>& used) |
| { |
| CHECK(initialized); |
| CHECK(!slaves.contains(slaveId)); |
| CHECK_EQ(slaveId, slaveInfo.id()); |
| CHECK(!paused || expectedAgentCount.isSome()); |
| |
| slaves.insert({slaveId, |
| Slave( |
| slaveInfo, |
| protobuf::slave::Capabilities(capabilities), |
| true, |
| total, |
| Resources::sum(used))}); |
| |
| Slave& slave = slaves.at(slaveId); |
| |
| // NOTE: We currently implement maintenance in the allocator to be able to |
| // leverage state and features such as the FrameworkSorter and OfferFilter. |
| if (unavailability.isSome()) { |
| slave.maintenance = Slave::Maintenance(unavailability.get()); |
| } |
| |
| trackReservations(total.reservations()); |
| |
| const Resources strippedScalars = total.createStrippedScalarQuantity(); |
| const ResourceQuantities agentScalarQuantities = |
| ResourceQuantities::fromScalarResources(strippedScalars); |
| |
| totalStrippedScalars += strippedScalars; |
| roleSorter->addSlave(slaveId, agentScalarQuantities); |
| |
| foreachvalue (const Owned<Sorter>& sorter, frameworkSorters) { |
| sorter->addSlave(slaveId, agentScalarQuantities); |
| } |
| |
| // See comment at `quotaRoleSorter` declaration regarding non-revocable. |
| quotaRoleSorter->addSlave( |
| slaveId, |
| ResourceQuantities::fromScalarResources(total.nonRevocable().scalars())); |
| |
| foreachpair (const FrameworkID& frameworkId, |
| const Resources& allocation, |
| used) { |
| // There are two cases here: |
| // |
| // (1) The framework has already been added to the allocator. |
| // In this case, we track the allocation in the sorters. |
| // |
| // (2) The framework has not yet been added to the allocator. |
| // The master will imminently add the framework using |
| // the `FrameworkInfo` recovered from the agent, and in |
| // the interim we do not track the resources allocated to |
| // this framework. This leaves a small window where the |
| // role sorting will under-account for the roles belonging |
| // to this framework. |
| // |
| // TODO(bmahler): Fix the issue outlined in (2). |
| if (!frameworks.contains(frameworkId)) { |
| continue; |
| } |
| |
| trackAllocatedResources(slaveId, frameworkId, allocation); |
| } |
| |
| // If we have just a number of recovered agents, we cannot distinguish |
| // between "old" agents from the registry and "new" ones joined after |
| // recovery has started. Because we do not persist enough information |
| // to base logical decisions on, any accounting algorithm here will be |
| // crude. Hence we opted for checking whether a certain amount of cluster |
| // capacity is back online, so that we are reasonably confident that we |
| // will not over-commit too many resources to quota that we will not be |
| // able to revoke. |
| if (paused && |
| expectedAgentCount.isSome() && |
| (static_cast<int>(slaves.size()) >= expectedAgentCount.get())) { |
| VLOG(1) << "Recovery complete: sufficient amount of agents added; " |
| << slaves.size() << " agents known to the allocator"; |
| |
| expectedAgentCount = None(); |
| resume(); |
| } |
| |
| LOG(INFO) << "Added agent " << slaveId << " (" << slave.info.hostname() << ")" |
| << " with " << slave.getTotal() |
| << " (allocated: " << slave.getAllocated() << ")"; |
| |
| allocate(slaveId); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::removeSlave( |
| const SlaveID& slaveId) |
| { |
| CHECK(initialized); |
| CHECK(slaves.contains(slaveId)); |
| |
| // TODO(bmahler): Per MESOS-621, this should remove the allocations |
| // that any frameworks have on this slave. Otherwise the caller may |
| // "leak" allocated resources accidentally if they forget to recover |
| // all the resources. Fixing this would require more information |
| // than what we currently track in the allocator. |
| |
| roleSorter->removeSlave(slaveId); |
| |
| foreachvalue (const Owned<Sorter>& sorter, frameworkSorters) { |
| sorter->removeSlave(slaveId); |
| } |
| |
| quotaRoleSorter->removeSlave(slaveId); |
| |
| untrackReservations(slaves.at(slaveId).getTotal().reservations()); |
| |
| const Resources strippedScalars = |
| slaves.at(slaveId).getTotal().createStrippedScalarQuantity(); |
| |
| CHECK(totalStrippedScalars.contains(strippedScalars)); |
| totalStrippedScalars -= strippedScalars; |
| |
| slaves.erase(slaveId); |
| allocationCandidates.erase(slaveId); |
| |
| removeFilters(slaveId); |
| |
| LOG(INFO) << "Removed agent " << slaveId; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::updateSlave( |
| const SlaveID& slaveId, |
| const SlaveInfo& info, |
| const Option<Resources>& total, |
| const Option<vector<SlaveInfo::Capability>>& capabilities) |
| { |
| CHECK(initialized); |
| CHECK(slaves.contains(slaveId)); |
| CHECK_EQ(slaveId, info.id()); |
| |
| Slave& slave = slaves.at(slaveId); |
| |
| bool updated = false; |
| |
| // Remove all offer filters for this slave if it was restarted with changed |
| // attributes. We do this because schedulers might have decided that they're |
| // not interested in offers from this slave based on the non-presence of some |
| // required attributes, and right now they have no other way of learning |
| // about this change. |
| // TODO(bennoe): Once the agent lifecycle design is implemented, there is a |
| // better way to notify frameworks about such changes and let them make this |
| // decision. We should think about ways to safely remove this check at that |
| // point in time. |
| if (!(Attributes(info.attributes()) == Attributes(slave.info.attributes()))) { |
| updated = true; |
| removeFilters(slaveId); |
| } |
| |
| if (!(slave.info == info)) { |
| updated = true; |
| |
| // We unconditionally overwrite the old domain and hostname: Even though |
| // the master places some restrictions on this (i.e. agents are not allowed |
| // to reregister with a different hostname) inside the allocator it |
| // doesn't matter, as the algorithm will work correctly either way. |
| slave.info = info; |
| } |
| |
| // Update agent capabilities. |
| if (capabilities.isSome()) { |
| protobuf::slave::Capabilities newCapabilities(capabilities.get()); |
| protobuf::slave::Capabilities oldCapabilities(slave.capabilities); |
| |
| slave.capabilities = newCapabilities; |
| |
| if (newCapabilities != oldCapabilities) { |
| updated = true; |
| |
| LOG(INFO) << "Agent " << slaveId << " (" << slave.info.hostname() << ")" |
| << " updated with capabilities " << slave.capabilities; |
| } |
| } |
| |
| if (total.isSome()) { |
| updated = updateSlaveTotal(slaveId, total.get()) || updated; |
| |
| LOG(INFO) << "Agent " << slaveId << " (" << slave.info.hostname() << ")" |
| << " updated with total resources " << total.get(); |
| } |
| |
| if (updated) { |
| allocate(slaveId); |
| } |
| } |
| |
| |
| void HierarchicalAllocatorProcess::addResourceProvider( |
| const SlaveID& slaveId, |
| const Resources& total, |
| const hashmap<FrameworkID, Resources>& used) |
| { |
| CHECK(initialized); |
| CHECK(slaves.contains(slaveId)); |
| |
| foreachpair (const FrameworkID& frameworkId, |
| const Resources& allocation, |
| used) { |
| // There are two cases here: |
| // |
| // (1) The framework has already been added to the allocator. |
| // In this case, we track the allocation in the sorters. |
| // |
| // (2) The framework has not yet been added to the allocator. |
| // We do not track the resources allocated to this |
| // framework. This leaves a small window where the role |
| // sorting will under-account for the roles belonging |
| // to this framework. This case should never occur since |
| // the master will always add the framework first. |
| if (!frameworks.contains(frameworkId)) { |
| continue; |
| } |
| |
| trackAllocatedResources(slaveId, frameworkId, allocation); |
| } |
| |
| Slave& slave = slaves.at(slaveId); |
| updateSlaveTotal(slaveId, slave.getTotal() + total); |
| slave.allocate(Resources::sum(used)); |
| |
| VLOG(1) |
| << "Grew agent " << slaveId << " by " |
| << total << " (total), " |
| << used << " (used)"; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::removeFilters(const SlaveID& slaveId) |
| { |
| CHECK(initialized); |
| |
| foreachvalue (Framework& framework, frameworks) { |
| framework.inverseOfferFilters.erase(slaveId); |
| |
| // Need a typedef here, otherwise the preprocessor gets confused |
| // by the comma in the template argument list. |
| typedef hashmap<SlaveID, hashset<shared_ptr<OfferFilter>>> Filters; |
| foreachvalue (Filters& filters, framework.offerFilters) { |
| filters.erase(slaveId); |
| } |
| } |
| |
| LOG(INFO) << "Removed all filters for agent " << slaveId; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::activateSlave( |
| const SlaveID& slaveId) |
| { |
| CHECK(initialized); |
| CHECK(slaves.contains(slaveId)); |
| |
| slaves.at(slaveId).activated = true; |
| |
| LOG(INFO) << "Agent " << slaveId << " reactivated"; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::deactivateSlave( |
| const SlaveID& slaveId) |
| { |
| CHECK(initialized); |
| CHECK(slaves.contains(slaveId)); |
| |
| slaves.at(slaveId).activated = false; |
| |
| LOG(INFO) << "Agent " << slaveId << " deactivated"; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::updateWhitelist( |
| const Option<hashset<string>>& _whitelist) |
| { |
| CHECK(initialized); |
| |
| whitelist = _whitelist; |
| |
| if (whitelist.isSome()) { |
| LOG(INFO) << "Updated agent whitelist: " << stringify(whitelist.get()); |
| |
| if (whitelist->empty()) { |
| LOG(WARNING) << "Whitelist is empty, no offers will be made!"; |
| } |
| } else { |
| LOG(INFO) << "Advertising offers for all agents"; |
| } |
| } |
| |
| |
| void HierarchicalAllocatorProcess::requestResources( |
| const FrameworkID& frameworkId, |
| const vector<Request>& requests) |
| { |
| CHECK(initialized); |
| |
| LOG(INFO) << "Received resource request from framework " << frameworkId; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::updateAllocation( |
| const FrameworkID& frameworkId, |
| const SlaveID& slaveId, |
| const Resources& offeredResources, |
| const vector<ResourceConversion>& conversions) |
| { |
| CHECK(initialized); |
| CHECK(slaves.contains(slaveId)); |
| CHECK(frameworks.contains(frameworkId)); |
| |
| Slave& slave = slaves.at(slaveId); |
| |
| // We require that an allocation is tied to a single role. |
| // |
| // TODO(bmahler): The use of `Resources::allocations()` induces |
| // unnecessary copying of `Resources` objects (which is expensive |
| // at the time this was written). |
| hashmap<string, Resources> allocations = offeredResources.allocations(); |
| |
| CHECK_EQ(1u, allocations.size()); |
| |
| string role = allocations.begin()->first; |
| |
| CHECK(frameworkSorters.contains(role)); |
| |
| const Owned<Sorter>& frameworkSorter = frameworkSorters.at(role); |
| const Resources frameworkAllocation = |
| frameworkSorter->allocation(frameworkId.value(), slaveId); |
| |
| // We keep a copy of the offered resources here and it is updated |
| // by the specified resource conversions. |
| // |
| // The resources in the resource conversions should have been |
| // normalized by the master (contains proper AllocationInfo). |
| // |
| // TODO(bmahler): Check that the resources in the resource |
| // conversions have AllocationInfo set. The master should enforce |
| // this. E.g. |
| // |
| // foreach (const ResourceConversion& conversion, conversions) { |
| // CHECK_NONE(validateConversionOnAllocatedResources(conversion)); |
| // } |
| Try<Resources> _updatedOfferedResources = offeredResources.apply(conversions); |
| CHECK_SOME(_updatedOfferedResources); |
| |
| const Resources& updatedOfferedResources = _updatedOfferedResources.get(); |
| |
| // Update the per-slave allocation. |
| slave.unallocate(offeredResources); |
| slave.allocate(updatedOfferedResources); |
| |
| // Update the allocation in the framework sorter. |
| frameworkSorter->update( |
| frameworkId.value(), |
| slaveId, |
| offeredResources, |
| updatedOfferedResources); |
| |
| // Update the allocation in the role sorter. |
| roleSorter->update( |
| role, |
| slaveId, |
| offeredResources, |
| updatedOfferedResources); |
| |
| // Update the allocated resources in the quota sorter. We only update |
| // the allocated resources if this role has quota set. |
| if (quotas.contains(role)) { |
| // See comment at `quotaRoleSorter` declaration regarding non-revocable. |
| quotaRoleSorter->update( |
| role, |
| slaveId, |
| offeredResources.nonRevocable(), |
| updatedOfferedResources.nonRevocable()); |
| } |
| |
| // Update the agent total resources so they are consistent with the updated |
| // allocation. We do not directly use `updatedOfferedResources` here because |
| // the agent's total resources shouldn't contain: |
| // 1. The additionally allocated shared resources. |
| // 2. `AllocationInfo` as set in `updatedOfferedResources`. |
| // |
| // We strip `AllocationInfo` from conversions in order to apply them |
| // successfully, since agent's total is stored as unallocated resources. |
| vector<ResourceConversion> strippedConversions; |
| Resources removedResources; |
| foreach (const ResourceConversion& conversion, conversions) { |
| // TODO(jieyu): Ideally, we should make sure agent's total |
| // resources are consistent with agent's allocation in terms of |
| // shared resources. In other words, we should increase agent's |
| // total resources as well for those additional allocation we did |
| // for shared resources. However, that means we need to update the |
| // agent's total resources when performing allocation for shared |
| // resources (in `__allocate()`). For now, we detect "additional" |
| // allocation for shared resources by checking if a conversion has |
| // an empty `consumed` field. |
| if (conversion.consumed.empty()) { |
| continue; |
| } |
| |
| // NOTE: For now, a resource conversion must either not change the resource |
| // quantities, or completely remove the consumed resources. See MESOS-8825. |
| if (conversion.converted.empty()) { |
| removedResources += conversion.consumed; |
| } |
| |
| Resources consumed = conversion.consumed; |
| Resources converted = conversion.converted; |
| |
| consumed.unallocate(); |
| converted.unallocate(); |
| |
| strippedConversions.emplace_back(consumed, converted); |
| } |
| |
| Try<Resources> updatedTotal = slave.getTotal().apply(strippedConversions); |
| CHECK_SOME(updatedTotal); |
| |
| updateSlaveTotal(slaveId, updatedTotal.get()); |
| |
| const Resources updatedFrameworkAllocation = |
| frameworkSorter->allocation(frameworkId.value(), slaveId); |
| |
| // Check that the changed quantities af the framework's allocation is exactly |
| // the same as the resources removed by the resource conversions. |
| // |
| // TODO(chhsiao): Revisit this constraint if we want to support other type of |
| // resource conversions. See MESOS-9015. |
| const Resources removedAllocationQuantities = |
| frameworkAllocation.createStrippedScalarQuantity() - |
| updatedFrameworkAllocation.createStrippedScalarQuantity(); |
| CHECK_EQ( |
| removedAllocationQuantities, |
| removedResources.createStrippedScalarQuantity()); |
| |
| LOG(INFO) << "Updated allocation of framework " << frameworkId |
| << " on agent " << slaveId |
| << " from " << frameworkAllocation |
| << " to " << updatedFrameworkAllocation; |
| } |
| |
| |
| Future<Nothing> HierarchicalAllocatorProcess::updateAvailable( |
| const SlaveID& slaveId, |
| const vector<Offer::Operation>& operations) |
| { |
| // Note that the operations may contain allocated resources, |
| // however such operations can be applied to unallocated |
| // resources unambiguously, so we don't have a strict CHECK |
| // for the operations to contain only unallocated resources. |
| |
| CHECK(initialized); |
| CHECK(slaves.contains(slaveId)); |
| |
| Slave& slave = slaves.at(slaveId); |
| |
| // It's possible for this 'apply' to fail here because a call to |
| // 'allocate' could have been enqueued by the allocator itself |
| // just before master's request to enqueue 'updateAvailable' |
| // arrives to the allocator. |
| // |
| // Master -------R------------ |
| // \----+ |
| // | |
| // Allocator --A-----A-U---A-- |
| // \___/ \___/ |
| // |
| // where A = allocate, R = reserve, U = updateAvailable |
| Try<Resources> updatedAvailable = slave.getAvailable().apply(operations); |
| if (updatedAvailable.isError()) { |
| VLOG(1) << "Failed to update available resources on agent " << slaveId |
| << ": " << updatedAvailable.error(); |
| return Failure(updatedAvailable.error()); |
| } |
| |
| // Update the total resources. |
| Try<Resources> updatedTotal = slave.getTotal().apply(operations); |
| CHECK_SOME(updatedTotal); |
| |
| // Update the total resources in the allocator and role and quota sorters. |
| updateSlaveTotal(slaveId, updatedTotal.get()); |
| |
| return Nothing(); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::updateUnavailability( |
| const SlaveID& slaveId, |
| const Option<Unavailability>& unavailability) |
| { |
| CHECK(initialized); |
| CHECK(slaves.contains(slaveId)); |
| |
| Slave& slave = slaves.at(slaveId); |
| |
| // NOTE: We currently implement maintenance in the allocator to be able to |
| // leverage state and features such as the FrameworkSorter and OfferFilter. |
| |
| // We explicitly remove all filters for the inverse offers of this slave. We |
| // do this because we want to force frameworks to reassess the calculations |
| // they have made to respond to the inverse offer. Unavailability of a slave |
| // can have a large effect on failure domain calculations and inter-leaved |
| // unavailability schedules. |
| foreachvalue (Framework& framework, frameworks) { |
| framework.inverseOfferFilters.erase(slaveId); |
| } |
| |
| // Remove any old unavailability. |
| slave.maintenance = None(); |
| |
| // If we have a new unavailability. |
| if (unavailability.isSome()) { |
| slave.maintenance = Slave::Maintenance(unavailability.get()); |
| } |
| |
| allocate(slaveId); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::updateInverseOffer( |
| const SlaveID& slaveId, |
| const FrameworkID& frameworkId, |
| const Option<UnavailableResources>& unavailableResources, |
| const Option<InverseOfferStatus>& status, |
| const Option<Filters>& filters) |
| { |
| CHECK(initialized); |
| CHECK(frameworks.contains(frameworkId)); |
| CHECK(slaves.contains(slaveId)); |
| |
| Framework& framework = frameworks.at(frameworkId); |
| Slave& slave = slaves.at(slaveId); |
| |
| CHECK(slave.maintenance.isSome()) |
| << "Agent " << slaveId |
| << " (" << slave.info.hostname() << ") should have maintenance scheduled"; |
| |
| // NOTE: We currently implement maintenance in the allocator to be able to |
| // leverage state and features such as the FrameworkSorter and OfferFilter. |
| |
| // We use a reference by alias because we intend to modify the |
| // `maintenance` and to improve readability. |
| Slave::Maintenance& maintenance = slave.maintenance.get(); |
| |
| // Only handle inverse offers that we currently have outstanding. If it is not |
| // currently outstanding this means it is old and can be safely ignored. |
| if (maintenance.offersOutstanding.contains(frameworkId)) { |
| // We always remove the outstanding offer so that we will send a new offer |
| // out the next time we schedule inverse offers. |
| maintenance.offersOutstanding.erase(frameworkId); |
| |
| // If the response is `Some`, this means the framework responded. Otherwise |
| // if it is `None` the inverse offer timed out or was rescinded. |
| if (status.isSome()) { |
| // For now we don't allow frameworks to respond with `UNKNOWN`. The caller |
| // should guard against this. This goes against the pattern of not |
| // checking external invariants; however, the allocator and master are |
| // currently so tightly coupled that this check is valuable. |
| CHECK_NE(status->status(), InverseOfferStatus::UNKNOWN); |
| |
| // If the framework responded, we update our state to match. |
| maintenance.statuses[frameworkId].CopyFrom(status.get()); |
| } |
| } |
| |
| // No need to install filters if `filters` is none. |
| if (filters.isNone()) { |
| return; |
| } |
| |
| // Create a refused inverse offer filter. |
| Try<Duration> timeout = Duration::create(Filters().refuse_seconds()); |
| |
| if (filters->refuse_seconds() > Days(365).secs()) { |
| LOG(WARNING) << "Using 365 days to create the refused inverse offer" |
| << " filter because the input value is too big"; |
| |
| timeout = Days(365); |
| } else if (filters->refuse_seconds() < 0) { |
| LOG(WARNING) << "Using the default value of 'refuse_seconds' to create" |
| << " the refused inverse offer filter because the input" |
| << " value is negative"; |
| |
| timeout = Duration::create(Filters().refuse_seconds()); |
| } else { |
| timeout = Duration::create(filters->refuse_seconds()); |
| |
| if (timeout.isError()) { |
| LOG(WARNING) << "Using the default value of 'refuse_seconds' to create" |
| << " the refused inverse offer filter because the input" |
| << " value is invalid: " + timeout.error(); |
| |
| timeout = Duration::create(Filters().refuse_seconds()); |
| } |
| } |
| |
| CHECK_SOME(timeout); |
| |
| if (timeout.get() != Duration::zero()) { |
| VLOG(1) << "Framework " << frameworkId |
| << " filtered inverse offers from agent " << slaveId |
| << " for " << timeout.get(); |
| |
| // Create a new inverse offer filter and delay its expiration. |
| shared_ptr<RefusedInverseOfferFilter> inverseOfferFilter = |
| make_shared<RefusedInverseOfferFilter>(timeout.get()); |
| |
| framework.inverseOfferFilters[slaveId].insert(inverseOfferFilter); |
| |
| weak_ptr<InverseOfferFilter> weakPtr = inverseOfferFilter; |
| |
| inverseOfferFilter->expired() |
| .onReady(defer(self(), [=](Nothing) { |
| expire(frameworkId, slaveId, weakPtr); |
| })); |
| } |
| } |
| |
| |
| Future<hashmap<SlaveID, hashmap<FrameworkID, InverseOfferStatus>>> |
| HierarchicalAllocatorProcess::getInverseOfferStatuses() |
| { |
| CHECK(initialized); |
| |
| hashmap<SlaveID, hashmap<FrameworkID, InverseOfferStatus>> result; |
| |
| // Make a copy of the most recent statuses. |
| foreachpair (const SlaveID& id, const Slave& slave, slaves) { |
| if (slave.maintenance.isSome()) { |
| result[id] = slave.maintenance->statuses; |
| } |
| } |
| |
| return result; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::recoverResources( |
| const FrameworkID& frameworkId, |
| const SlaveID& slaveId, |
| const Resources& resources, |
| const Option<Filters>& filters) |
| { |
| CHECK(initialized); |
| |
| if (resources.empty()) { |
| return; |
| } |
| |
| // For now, we require that resources are recovered within a single |
| // allocation role (since filtering in the same manner across roles |
| // seems undesirable). |
| // |
| // TODO(bmahler): The use of `Resources::allocations()` induces |
| // unnecessary copying of `Resources` objects (which is expensive |
| // at the time this was written). |
| hashmap<string, Resources> allocations = resources.allocations(); |
| |
| CHECK_EQ(1u, allocations.size()); |
| |
| string role = allocations.begin()->first; |
| |
| // Updated resources allocated to framework (if framework still |
| // exists, which it might not in the event that we dispatched |
| // Master::offer before we received |
| // MesosAllocatorProcess::removeFramework or |
| // MesosAllocatorProcess::deactivateFramework, in which case we will |
| // have already recovered all of its resources). |
| if (frameworks.contains(frameworkId)) { |
| CHECK(frameworkSorters.contains(role)); |
| |
| const Owned<Sorter>& frameworkSorter = frameworkSorters.at(role); |
| |
| if (frameworkSorter->contains(frameworkId.value())) { |
| untrackAllocatedResources(slaveId, frameworkId, resources); |
| |
| // Stop tracking the framework under this role if it's no longer |
| // subscribed and no longer has resources allocated to the role. |
| if (frameworks.at(frameworkId).roles.count(role) == 0 && |
| frameworkSorter->allocation(frameworkId.value()).empty()) { |
| untrackFrameworkUnderRole(frameworkId, role); |
| } |
| } |
| } |
| |
| // Update resources allocated on slave (if slave still exists, |
| // which it might not in the event that we dispatched Master::offer |
| // before we received Allocator::removeSlave). |
| if (slaves.contains(slaveId)) { |
| Slave& slave = slaves.at(slaveId); |
| |
| CHECK(slave.getAllocated().contains(resources)) |
| << slave.getAllocated() << " does not contain " << resources; |
| |
| slave.unallocate(resources); |
| |
| VLOG(1) << "Recovered " << resources |
| << " (total: " << slave.getTotal() |
| << ", allocated: " << slave.getAllocated() << ")" |
| << " on agent " << slaveId |
| << " from framework " << frameworkId; |
| } |
| |
| // No need to install the filter if 'filters' is none. |
| if (filters.isNone()) { |
| return; |
| } |
| |
| // No need to install the filter if slave/framework does not exist. |
| if (!frameworks.contains(frameworkId) || !slaves.contains(slaveId)) { |
| return; |
| } |
| |
| // Create a refused resources filter. |
| Try<Duration> timeout = Duration::create(Filters().refuse_seconds()); |
| |
| if (filters->refuse_seconds() > Days(365).secs()) { |
| LOG(WARNING) << "Using 365 days to create the refused resources offer" |
| << " filter because the input value is too big"; |
| |
| timeout = Days(365); |
| } else if (filters->refuse_seconds() < 0) { |
| LOG(WARNING) << "Using the default value of 'refuse_seconds' to create" |
| << " the refused resources offer filter because the input" |
| << " value is negative"; |
| |
| timeout = Duration::create(Filters().refuse_seconds()); |
| } else { |
| timeout = Duration::create(filters->refuse_seconds()); |
| |
| if (timeout.isError()) { |
| LOG(WARNING) << "Using the default value of 'refuse_seconds' to create" |
| << " the refused resources offer filter because the input" |
| << " value is invalid: " + timeout.error(); |
| |
| timeout = Duration::create(Filters().refuse_seconds()); |
| } |
| } |
| |
| CHECK_SOME(timeout); |
| |
| if (timeout.get() != Duration::zero()) { |
| VLOG(1) << "Framework " << frameworkId |
| << " filtered agent " << slaveId |
| << " for " << timeout.get(); |
| |
| // Expire the filter after both an `allocationInterval` and the |
| // `timeout` have elapsed. This ensures that the filter does not |
| // expire before we perform the next allocation for this agent, |
| // see MESOS-4302 for more information. |
| // |
| // Because the next periodic allocation goes through a dispatch |
| // after `allocationInterval`, we do the same for `expire()` |
| // (with a helper `_expire()`) to achieve the above. |
| // |
| // TODO(alexr): If we allocated upon resource recovery |
| // (MESOS-3078), we would not need to increase the timeout here. |
| timeout = std::max(allocationInterval, timeout.get()); |
| |
| // Create a new filter. Note that we unallocate the resources |
| // since filters are applied per-role already. |
| Resources unallocated = resources; |
| unallocated.unallocate(); |
| |
| shared_ptr<RefusedOfferFilter> offerFilter = |
| make_shared<RefusedOfferFilter>(unallocated, timeout.get()); |
| |
| frameworks.at(frameworkId) |
| .offerFilters[role][slaveId].insert(offerFilter); |
| |
| weak_ptr<OfferFilter> weakPtr = offerFilter; |
| |
| offerFilter->expired() |
| .onReady(defer(self(), [=](Nothing) { |
| expire(frameworkId, role, slaveId, weakPtr); |
| })); |
| } |
| } |
| |
| |
| void HierarchicalAllocatorProcess::suppressRoles( |
| const FrameworkID& frameworkId, |
| const set<string>& roles) |
| { |
| CHECK(initialized); |
| CHECK(frameworks.contains(frameworkId)); |
| |
| Framework& framework = frameworks.at(frameworkId); |
| |
| // Deactivating the framework in the sorter is fine as long as |
| // SUPPRESS is not parameterized. When parameterization is added, |
| // we have to differentiate between the cases here. |
| |
| foreach (const string& role, roles) { |
| CHECK(frameworkSorters.contains(role)); |
| |
| frameworkSorters.at(role)->deactivate(frameworkId.value()); |
| framework.suppressedRoles.insert(role); |
| framework.metrics->suppressRole(role); |
| } |
| |
| // TODO(bmahler): This logs roles that were already suppressed, |
| // only log roles that transitioned from unsuppressed -> suppressed. |
| LOG(INFO) << "Suppressed offers for roles " << stringify(roles) |
| << " of framework " << frameworkId; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::suppressOffers( |
| const FrameworkID& frameworkId, |
| const set<string>& roles_) |
| { |
| CHECK(initialized); |
| CHECK(frameworks.contains(frameworkId)); |
| |
| Framework& framework = frameworks.at(frameworkId); |
| |
| const set<string>& roles = roles_.empty() ? framework.roles : roles_; |
| suppressRoles(frameworkId, roles); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::unsuppressRoles( |
| const FrameworkID& frameworkId, |
| const set<string>& roles) |
| { |
| CHECK(initialized); |
| CHECK(frameworks.contains(frameworkId)); |
| |
| Framework& framework = frameworks.at(frameworkId); |
| |
| // Activating the framework in the sorter is fine as long as |
| // SUPPRESS is not parameterized. When parameterization is added, |
| // we may need to differentiate between the cases here. |
| foreach (const string& role, roles) { |
| CHECK(frameworkSorters.contains(role)); |
| |
| frameworkSorters.at(role)->activate(frameworkId.value()); |
| framework.suppressedRoles.erase(role); |
| framework.metrics->reviveRole(role); |
| } |
| |
| // TODO(bmahler): This logs roles that were already unsuppressed, |
| // only log roles that transitioned from suppressed -> unsuppressed. |
| LOG(INFO) << "Unsuppressed offers for roles " << stringify(roles) |
| << " of framework " << frameworkId; |
| } |
| |
| |
| void HierarchicalAllocatorProcess::reviveOffers( |
| const FrameworkID& frameworkId, |
| const set<string>& roles_) |
| { |
| CHECK(initialized); |
| CHECK(frameworks.contains(frameworkId)); |
| |
| Framework& framework = frameworks.at(frameworkId); |
| framework.inverseOfferFilters.clear(); |
| |
| const set<string>& roles = roles_.empty() ? framework.roles : roles_; |
| foreach (const string& role, roles) { |
| framework.offerFilters.erase(role); |
| } |
| |
| unsuppressRoles(frameworkId, roles); |
| |
| LOG(INFO) << "Revived roles " << stringify(roles) |
| << " of framework " << frameworkId; |
| |
| allocate(); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::setQuota( |
| const string& role, |
| const Quota& quota) |
| { |
| CHECK(initialized); |
| |
| // This method should be called by the master only if the quota for |
| // the role is not set. Setting quota differs from updating it because |
| // the former moves the role to a different allocation group with a |
| // dedicated sorter, while the later just updates the actual quota. |
| CHECK(!quotas.contains(role)); |
| |
| // Persist quota in memory and add the role into the corresponding |
| // allocation group. |
| quotas[role] = quota; |
| quotaRoleSorter->add(role); |
| quotaRoleSorter->activate(role); |
| |
| // Copy allocation information for the quota'ed role. |
| if (roleSorter->contains(role)) { |
| foreachpair ( |
| const SlaveID& slaveId, |
| const Resources& resources, |
| roleSorter->allocation(role)) { |
| // See comment at `quotaRoleSorter` declaration regarding non-revocable. |
| quotaRoleSorter->allocated(role, slaveId, resources.nonRevocable()); |
| } |
| } |
| |
| metrics.setQuota(role, quota); |
| |
| // TODO(alexr): Print all quota info for the role. |
| LOG(INFO) << "Set quota " << quota.info.guarantee() << " for role '" << role |
| << "'"; |
| |
| // NOTE: Since quota changes do not result in rebalancing of |
| // offered resources, we do not trigger an allocation here; the |
| // quota change will be reflected in subsequent allocations. |
| // |
| // If we add the ability for quota changes to incur a rebalancing |
| // of offered resources, then we should trigger that here. |
| } |
| |
| |
| void HierarchicalAllocatorProcess::removeQuota( |
| const string& role) |
| { |
| CHECK(initialized); |
| |
| // Do not allow removing quota if it is not set. |
| CHECK(quotas.contains(role)); |
| CHECK(quotaRoleSorter->contains(role)); |
| |
| // TODO(alexr): Print all quota info for the role. |
| LOG(INFO) << "Removed quota " << quotas[role].info.guarantee() |
| << " for role '" << role << "'"; |
| |
| // Remove the role from the quota'ed allocation group. |
| quotas.erase(role); |
| quotaRoleSorter->remove(role); |
| |
| metrics.removeQuota(role); |
| |
| // NOTE: Since quota changes do not result in rebalancing of |
| // offered resources, we do not trigger an allocation here; the |
| // quota change will be reflected in subsequent allocations. |
| // |
| // If we add the ability for quota changes to incur a rebalancing |
| // of offered resources, then we should trigger that here. |
| } |
| |
| |
| void HierarchicalAllocatorProcess::updateWeights( |
| const vector<WeightInfo>& weightInfos) |
| { |
| CHECK(initialized); |
| |
| foreach (const WeightInfo& weightInfo, weightInfos) { |
| CHECK(weightInfo.has_role()); |
| |
| quotaRoleSorter->updateWeight(weightInfo.role(), weightInfo.weight()); |
| roleSorter->updateWeight(weightInfo.role(), weightInfo.weight()); |
| } |
| |
| // NOTE: Since weight changes do not result in rebalancing of |
| // offered resources, we do not trigger an allocation here; the |
| // weight change will be reflected in subsequent allocations. |
| // |
| // If we add the ability for weight changes to incur a rebalancing |
| // of offered resources, then we should trigger that here. |
| } |
| |
| |
| void HierarchicalAllocatorProcess::pause() |
| { |
| if (!paused) { |
| VLOG(1) << "Allocation paused"; |
| |
| paused = true; |
| } |
| } |
| |
| |
| void HierarchicalAllocatorProcess::resume() |
| { |
| if (paused) { |
| VLOG(1) << "Allocation resumed"; |
| |
| paused = false; |
| } |
| } |
| |
| |
| Future<Nothing> HierarchicalAllocatorProcess::allocate() |
| { |
| return allocate(slaves.keys()); |
| } |
| |
| |
| Future<Nothing> HierarchicalAllocatorProcess::allocate( |
| const SlaveID& slaveId) |
| { |
| hashset<SlaveID> slaves({slaveId}); |
| return allocate(slaves); |
| } |
| |
| |
| Future<Nothing> HierarchicalAllocatorProcess::allocate( |
| const hashset<SlaveID>& slaveIds) |
| { |
| if (paused) { |
| VLOG(2) << "Skipped allocation because the allocator is paused"; |
| |
| return Nothing(); |
| } |
| |
| allocationCandidates |= slaveIds; |
| |
| if (allocation.isNone() || !allocation->isPending()) { |
| metrics.allocation_run_latency.start(); |
| allocation = dispatch(self(), &Self::_allocate); |
| } |
| |
| return allocation.get(); |
| } |
| |
| |
| Nothing HierarchicalAllocatorProcess::_allocate() |
| { |
| metrics.allocation_run_latency.stop(); |
| |
| if (paused) { |
| VLOG(2) << "Skipped allocation because the allocator is paused"; |
| |
| return Nothing(); |
| } |
| |
| ++metrics.allocation_runs; |
| |
| Stopwatch stopwatch; |
| stopwatch.start(); |
| metrics.allocation_run.start(); |
| |
| __allocate(); |
| |
| // NOTE: For now, we implement maintenance inverse offers within the |
| // allocator. We leverage the existing timer/cycle of offers to also do any |
| // "deallocation" (inverse offers) necessary to satisfy maintenance needs. |
| deallocate(); |
| |
| metrics.allocation_run.stop(); |
| |
| VLOG(1) << "Performed allocation for " << allocationCandidates.size() |
| << " agents in " << stopwatch.elapsed(); |
| |
| // Clear the candidates on completion of the allocation run. |
| allocationCandidates.clear(); |
| |
| return Nothing(); |
| } |
| |
| |
| // TODO(alexr): Consider factoring out the quota allocation logic. |
| void HierarchicalAllocatorProcess::__allocate() |
| { |
| // Compute the offerable resources, per framework: |
| // (1) For reserved resources on the slave, allocate these to a |
| // framework having the corresponding role. |
| // (2) For unreserved resources on the slave, allocate these |
| // to a framework of any role. |
| hashmap<FrameworkID, hashmap<string, hashmap<SlaveID, Resources>>> offerable; |
| |
| // NOTE: This function can operate on a small subset of |
| // `allocationCandidates`, we have to make sure that we don't |
| // assume cluster knowledge when summing resources from that set. |
| |
| vector<SlaveID> slaveIds; |
| slaveIds.reserve(allocationCandidates.size()); |
| |
| // Filter out non-whitelisted, removed, and deactivated slaves |
| // in order not to send offers for them. |
| foreach (const SlaveID& slaveId, allocationCandidates) { |
| if (isWhitelisted(slaveId) && |
| slaves.contains(slaveId) && |
| slaves.at(slaveId).activated) { |
| slaveIds.push_back(slaveId); |
| } |
| } |
| |
| // Randomize the order in which slaves' resources are allocated. |
| // |
| // TODO(vinod): Implement a smarter sorting algorithm. |
| std::random_shuffle(slaveIds.begin(), slaveIds.end()); |
| |
| // Returns the __quantity__ of resources allocated to a role with |
| // non-default quota. Since we account for reservations and persistent |
| // volumes toward quota, we strip reservation and persistent volumes |
| // related information for comparability. The result is used to |
| // determine whether a role's quota guarantee is satisfied, and |
| // also to determine how many resources the role would need in |
| // order to meet its quota guarantee. |
| // |
| // NOTE: Revocable resources are excluded in `quotaRoleSorter`. |
| auto getQuotaRoleAllocatedScalarQuantities = [this](const string& role) { |
| CHECK(quotas.contains(role)); |
| return quotaRoleSorter->allocationScalarQuantities(role); |
| }; |
| |
| // Returns the result of shrinking the provided resources down to the |
| // target scalar quantities. If a resource does not have a target |
| // quantity provided, it will not be shrunk. |
| // |
| // Note that some resources are indivisible (e.g. MOUNT volume) and |
| // may be excluded in entirety in order to achieve the target size |
| // (this may lead to the result size being smaller than the target size). |
| // |
| // Note also that there may be more than one result that satisfies |
| // the target sizes (e.g. need to exclude 1 of 2 disks); this function |
| // will make a random choice in these cases. |
| auto shrinkResources = |
| [](const Resources& resources, |
| hashmap<string, Value::Scalar> targetScalarQuantites) { |
| google::protobuf::RepeatedPtrField<Resource> |
| resourceVector = resources; |
| |
| random_shuffle(resourceVector.begin(), resourceVector.end()); |
| |
| Resources result; |
| foreach (Resource& resource, resourceVector) { |
| if (!targetScalarQuantites.contains(resource.name())) { |
| // Resource that has no target quantity is left as is. |
| result += std::move(resource); |
| continue; |
| } |
| |
| Option<Value::Scalar> limitScalar = |
| targetScalarQuantites.get(resource.name()); |
| |
| if (Resources::shrink(&resource, limitScalar.get())) { |
| targetScalarQuantites[resource.name()] -= resource.scalar(); |
| result += std::move(resource); |
| } |
| } |
| |
| return result; |
| }; |
| |
| // To enforce quota, we keep track of consumed quota for roles with a |
| // non-default quota. |
| // |
| // NOTE: We build the map here to avoid repetitive aggregation in the |
| // allocation loop. But this map will still need to be updated in the |
| // allocation loop as we make new allocations. |
| // |
| // TODO(mzhu): Build and persist this information across allocation cycles in |
| // track/untrackAllocatedResources(). |
| // |
| // TODO(mzhu): Ideally, we want the sorter to track consumed quota. It then |
| // could use consumed quota instead of allocated resources (the former |
| // includes unallocated reservations while the latter does not) to calculate |
| // the DRF share. This would help to: |
| // |
| // (1) Solve the fairness issue when roles with unallocated |
| // reservations may game the allocator (See MESOS-8299). |
| // |
| // (2) Simplify the quota enforcement logic -- the allocator |
| // would no longer need to track reservations separately. |
| // |
| // Note these are __quantities__ with no meta-data. |
| hashmap<string, Resources> rolesConsumedQuotaScalarQuantites; |
| |
| // We charge a role against its quota by considering its allocation as well |
| // as any unallocated reservations since reservations are bound to the role. |
| // In other words, we always consider reservations as consuming quota, |
| // regardless of whether they are allocated. |
| // It is calculated as: |
| // |
| // Consumed Quota = reservations + unreserved allocation |
| // = reservations + allocation - allocated reservations |
| foreachkey (const string& role, quotas) { |
| // First add reservations. |
| rolesConsumedQuotaScalarQuantites[role] += |
| reservationScalarQuantities.get(role).getOrElse(Resources()); |
| |
| // Then add allocated resoruces. |
| rolesConsumedQuotaScalarQuantites[role] += |
| getQuotaRoleAllocatedScalarQuantities(role); |
| |
| // Lastly subtract allocated reservations on each agent. |
| foreachvalue ( |
| const Resources& resources, quotaRoleSorter->allocation(role)) { |
| rolesConsumedQuotaScalarQuantites[role] -= |
| resources.reserved().createStrippedScalarQuantity(); |
| } |
| } |
| |
| // We need to constantly make sure that we are holding back enough |
| // unreserved resources that the remaining quota guarantee can later |
| // be satisfied when needed: |
| // |
| // Required unreserved headroom = |
| // sum (guarantee - consumed quota) for each role. |
| // |
| // Given the above, if a role has more reservations (which count towards |
| // consumed quota) than quota guarantee, we don't need to hold back any |
| // unreserved headroom for it. |
| Resources requiredHeadroom; |
| foreachpair (const string& role, const Quota& quota, quotas) { |
| // We can safely subtract resources without checking inclusion. If the |
| // minuend resource is less than the subtrahend resource, the result is an |
| // empty resource. |
| requiredHeadroom += Resources(quota.info.guarantee()) - |
| rolesConsumedQuotaScalarQuantites.get(role).getOrElse(Resources()); |
| } |
| |
| // We will allocate resources while ensuring that the required |
| // unreserved non-revocable headroom is still available. Otherwise, |
| // we will not be able to satisfy the quota guarantee later. |
| // |
| // available headroom = unallocated unreserved non-revocable resources |
| // |
| // We compute this as: |
| // |
| // available headroom = total resources - |
| // allocated resources - |
| // unallocated reservations - |
| // unallocated revocable resources |
| Resources availableHeadroom = totalStrippedScalars; |
| |
| // NOTE: The role sorter does not return aggregated allocation |
| // information whereas `reservationScalarQuantities` does, so |
| // we need to loop over only top level roles for the latter. |
| |
| // Subtract allocated resources from the total. |
| foreachkey (const string& role, roles) { |
| availableHeadroom -= roleSorter->allocationScalarQuantities(role); |
| } |
| |
| // Calculate total allocated reservations. Note that we need to ensure |
| // we count a reservation for "a" being allocated to "a/b", therefore |
| // we cannot simply loop over the reservations' roles. |
| Resources totalAllocatedReservationScalarQuantities; |
| foreachkey (const string& role, roles) { |
| const hashmap<SlaveID, Resources>* allocations; |
| if (quotaRoleSorter->contains(role)) { |
| allocations = "aRoleSorter->allocation(role); |
| } else if (roleSorter->contains(role)) { |
| allocations = &roleSorter->allocation(role); |
| } else { |
| continue; // This role has no allocation. |
| } |
| |
| foreachvalue (const Resources& resources, *CHECK_NOTNULL(allocations)) { |
| totalAllocatedReservationScalarQuantities += |
| resources.reserved().createStrippedScalarQuantity(); |
| } |
| } |
| |
| // Subtract total unallocated reservations. |
| availableHeadroom -= |
| Resources::sum(reservationScalarQuantities) - |
| totalAllocatedReservationScalarQuantities; |
| |
| // Subtract revocable resources. |
| foreachvalue (const Slave& slave, slaves) { |
| availableHeadroom -= |
| slave.getAvailable().revocable().createStrippedScalarQuantity(); |
| } |
| |
| if (!quotas.empty()) { |
| LOG(INFO) << "Before allocation, required quota headroom is " |
| << requiredHeadroom |
| << " and available quota headroom is " << availableHeadroom; |
| } |
| |
| // Due to the two stages in the allocation algorithm and the nature of |
| // shared resources being re-offerable even if already allocated, the |
| // same shared resources can appear in two (and not more due to the |
| // `allocatable` check in each stage) distinct offers in one allocation |
| // cycle. This is undesirable since the allocator API contract should |
| // not depend on its implementation details. For now we make sure a |
| // shared resource is only allocated once in one offer cycle. We use |
| // `offeredSharedResources` to keep track of shared resources already |
| // allocated in the current cycle. |
| hashmap<SlaveID, Resources> offeredSharedResources; |
| |
| // Quota guarantee comes first and bursting above the quota guarantee |
| // up to the quota limit comes second. Here we process only those |
| // roles for that have a non-empty quota guarantee. |
| // |
| // NOTE: Even though we keep track of the available headroom, we still |
| // dedicate the first stage to satisfy role's quota guarantee. The reason |
| // is that quota guarantee headroom only acts as a quantity guarantee. |
| // Frameworks might have filters or capabilities such that those resources |
| // set aside for the headroom cannot be used by these frameworks, resulting |
| // in unsatisfied quota guarantee (despite enough quota headroom). Thus |
| // we try to satisfy the quota guarantee in this first stage so that those |
| // roles with unsatisfied guarantee can have more choices and higher |
| // probability in getting their guarantee satisfied. |
| foreach (const SlaveID& slaveId, slaveIds) { |
| CHECK(slaves.contains(slaveId)); |
| Slave& slave = slaves.at(slaveId); |
| |
| foreach (const string& role, quotaRoleSorter->sort()) { |
| CHECK(quotas.contains(role)); |
| |
| const Quota& quota = quotas.at(role); |
| |
| // If there are no active frameworks in this role, we do not |
| // need to do any allocations for this role. |
| if (!roles.contains(role)) { |
| continue; |
| } |
| |
| // TODO(bmahler): Handle shared volumes, which are always available but |
| // should be excluded here based on `offeredSharedResources`. |
| if (!allocatable(slave.getAvailable())) { |
| break; // Nothing left on this agent. |
| } |
| |
| // Fetch frameworks according to their fair share. |
| // NOTE: Suppressed frameworks are not included in the sort. |
| CHECK(frameworkSorters.contains(role)); |
| const Owned<Sorter>& frameworkSorter = frameworkSorters.at(role); |
| |
| foreach (const string& frameworkId_, frameworkSorter->sort()) { |
| Resources available = slave.getAvailable(); |
| |
| // Offer a shared resource only if it has not been offered in this |
| // offer cycle to a framework. |
| available -= offeredSharedResources.get(slaveId).getOrElse(Resources()); |
| |
| if (!allocatable(available.allocatableTo(role))) { |
| break; // Nothing left for the role. |
| } |
| |
| FrameworkID frameworkId; |
| frameworkId.set_value(frameworkId_); |
| |
| CHECK(frameworks.contains(frameworkId)); |
| |
| const Framework& framework = frameworks.at(frameworkId); |
| CHECK(framework.active) << frameworkId; |
| |
| if (!isCapableOfReceivingAgent(framework.capabilities, slave)) { |
| continue; |
| } |
| |
| available = stripIncapableResources(available, framework.capabilities); |
| |
| // In this first stage, we allocate the role's reservations as well as |
| // any unreserved resources while ensuring the role stays within its |
| // quota guarantee. This means that we'll "chop" the unreserved |
| // resources up to the quota guarantee if necessary. |
| // |
| // E.g. A role has no allocations or reservations yet and a 10 cpu |
| // quota limit. We'll chop a 15 cpu agent down to only |
| // allocate 10 cpus to the role to keep it within its guarantee. |
| // |
| // In the case that the role needs some of the resources on this |
| // agent to make progress towards its quota, or the role is being |
| // allocated some reservation(s), we'll *also* allocate all of |
| // the resources for which it does not have quota guarantee. |
| // |
| // E.g. The agent has 1 cpu, 1024 mem, 1024 disk, 1 gpu, 5 ports |
| // and the role has quota for 1 cpu, 1024 mem. We'll include |
| // the disk, gpu, and ports in the allocation, despite the |
| // role not having any quota guarantee for them. |
| // |
| // We have to do this for now because it's not possible to set |
| // quota on non-scalar resources, like ports. For scalar resources |
| // that this role has no quota for, it can be allocated as long |
| // as the quota headroom is not violated. |
| // |
| // TODO(mzhu): Since we're treating the resources with unset |
| // quota as having no guarantee and no limit, these should be |
| // also be allocated further in the second allocation "phase" |
| // below (above guarantee up to limit). |
| |
| // NOTE: Currently, frameworks are allowed to have '*' role. |
| // Calling reserved('*') returns an empty Resources object. |
| // |
| // NOTE: Since we currently only support top-level roles to |
| // have quota, there are no ancestor reservations involved here. |
| Resources toAllocate = available.reserved(role).nonRevocable(); |
| |
| // This is a scalar quantity with no meta-data. |
| Resources unsatisfiedQuotaGuarantee = |
| Resources(quota.info.guarantee()) - |
| rolesConsumedQuotaScalarQuantites.get(role).getOrElse(Resources()); |
| |
| Resources unreserved = available.nonRevocable().unreserved(); |
| |
| // First, allocate resources up to a role's quota guarantee. |
| |
| hashmap<string, Value::Scalar> unsatisfiedQuotaGuaranteeScalarLimit; |
| foreach (const string& name, unsatisfiedQuotaGuarantee.names()) { |
| unsatisfiedQuotaGuaranteeScalarLimit[name] += |
| CHECK_NOTNONE(unsatisfiedQuotaGuarantee.get<Value::Scalar>(name)); |
| } |
| |
| Resources newQuotaAllocation = |
| unreserved.filter([&](const Resource& resource) { |
| return |
| unsatisfiedQuotaGuaranteeScalarLimit.contains(resource.name()); |
| }); |
| |
| newQuotaAllocation = shrinkResources(newQuotaAllocation, |
| unsatisfiedQuotaGuaranteeScalarLimit); |
| |
| toAllocate += newQuotaAllocation; |
| |
| // We only include the non-quota guarantee resources (with headroom |
| // taken into account) if this role is getting any other resources |
| // as well i.e. it is getting either some quota guarantee resources or |
| // a reservation. Otherwise, this role is not going to get any |
| // allocation. We can safely `continue` here. |
| if (toAllocate.empty()) { |
| continue; |
| } |
| |
| // Second, allocate scalar resources with unset quota while maintaining |
| // the quota headroom. |
| |
| set<string> quotaGuaranteeResourceNames = |
| Resources(quota.info.guarantee()).names(); |
| |
| Resources nonQuotaGuaranteeResources = |
| unreserved.filter( |
| ["aGuaranteeResourceNames] (const Resource& resource) { |
| return quotaGuaranteeResourceNames.count(resource.name()) == 0; |
| } |
| ); |
| |
| // Allocation Limit = Available Headroom - Required Headroom |
| Resources headroomResourcesLimit = availableHeadroom - requiredHeadroom; |
| |
| hashmap<string, Value::Scalar> headroomScalarLimit; |
| foreach (const string& name, headroomResourcesLimit.names()) { |
| headroomScalarLimit[name] = |
| CHECK_NOTNONE(headroomResourcesLimit.get<Value::Scalar>(name)); |
| } |
| |
| // If a resource type is absent in `headroomScalarLimit`, it means this |
| // type of resource is already in quota headroom deficit and we make |
| // no more allocations. They are filtered out. |
| nonQuotaGuaranteeResources = nonQuotaGuaranteeResources.filter( |
| [&] (const Resource& resource) { |
| return headroomScalarLimit.contains(resource.name()); |
| } |
| ); |
| |
| nonQuotaGuaranteeResources = |
| shrinkResources(nonQuotaGuaranteeResources, headroomScalarLimit); |
| |
| toAllocate += nonQuotaGuaranteeResources; |
| |
| // Lastly, allocate non-scalar resources--we currently do not support |
| // setting quota for non-scalar resources. They are always allocated |
| // in full. |
| toAllocate += |
| unreserved.filter([] (const Resource& resource) { |
| return resource.type() != Value::SCALAR; |
| }); |
| |
| // If the framework filters these resources, ignore. |
| if (!allocatable(toAllocate) || |
| isFiltered(frameworkId, role, slaveId, toAllocate)) { |
| continue; |
| } |
| |
| VLOG(2) << "Allocating " << toAllocate << " on agent " << slaveId |
| << " to role " << role << " of framework " << frameworkId |
| << " as part of its role quota"; |
| |
| toAllocate.allocate(role); |
| |
| offerable[frameworkId][role][slaveId] += toAllocate; |
| offeredSharedResources[slaveId] += toAllocate.shared(); |
| |
| Resources allocatedUnreserved = |
| toAllocate.unreserved().createStrippedScalarQuantity(); |
| |
| // Update role consumed quota. |
| rolesConsumedQuotaScalarQuantites[role] += allocatedUnreserved; |
| |
| // Track quota guarantee headroom change. |
| |
| // `requiredHeadroom` counts total unsatisfied quota guarantee. Thus |
| // only the part of the allocated resources that satisfy some of the |
| // role's guarantee should be subtracted. Allocation of reserved |
| // resources or resources that this role has unset guarantee do not |
| // affect `requiredHeadroom`. |
| requiredHeadroom -= newQuotaAllocation.createStrippedScalarQuantity(); |
| |
| // `availableHeadroom` counts total unreserved non-revocable resources |
| // in the cluster. |
| availableHeadroom -= allocatedUnreserved; |
| |
| slave.allocate(toAllocate); |
| |
| trackAllocatedResources(slaveId, frameworkId, toAllocate); |
| } |
| } |
| } |
| |
| // Similar to the first stage, we will allocate resources while ensuring |
| // that the required unreserved non-revocable headroom is still available |
| // for unsastified quota guarantees. Otherwise, we will not be able to |
| // satisfy quota guarantees later. Reservations to non-quota roles and |
| // revocable resources will always be included in the offers since these |
| // are not part of the headroom (and therefore can't be used to satisfy |
| // quota guarantees). |
| |
| // For logging purposes, we track the number of agents that had resources |
| // held back for quota headroom, as well as how many resources in total |
| // were held back. |
| // |
| // While we also held resources back for quota headroom in the first stage, |
| // we do not track it there. This is because in the second stage, we try to |
| // allocate all resources (including the ones held back in the first stage). |
| // Thus only resources held back in the second stage are truly held back for |
| // the whole allocation cycle. |
| Resources heldBackForHeadroom; |
| size_t heldBackAgentCount = 0; |
| |
| foreach (const SlaveID& slaveId, slaveIds) { |
| CHECK(slaves.contains(slaveId)); |
| Slave& slave = slaves.at(slaveId); |
| |
| foreach (const string& role, roleSorter->sort()) { |
| // In the second allocation stage, we only allocate |
| // for non-quota roles. |
| if (quotas.contains(role)) { |
| continue; |
| } |
| |
| // TODO(bmahler): Handle shared volumes, which are always available but |
| // should be excluded here based on `offeredSharedResources`. |
| if (!allocatable(slave.getAvailable())) { |
| break; // Nothing left on this agent. |
| } |
| |
| // NOTE: Suppressed frameworks are not included in the sort. |
| CHECK(frameworkSorters.contains(role)); |
| const Owned<Sorter>& frameworkSorter = frameworkSorters.at(role); |
| |
| foreach (const string& frameworkId_, frameworkSorter->sort()) { |
| Resources available = slave.getAvailable(); |
| |
| // Offer a shared resource only if it has not been offered in this |
| // offer cycle to a framework. |
| available -= offeredSharedResources.get(slaveId).getOrElse(Resources()); |
| |
| if (!allocatable(available.allocatableTo(role))) { |
| break; // Nothing left for the role. |
| } |
| |
| FrameworkID frameworkId; |
| frameworkId.set_value(frameworkId_); |
| |
| CHECK(frameworks.contains(frameworkId)); |
| |
| const Framework& framework = frameworks.at(frameworkId); |
| |
| if (!isCapableOfReceivingAgent(framework.capabilities, slave)) { |
| continue; |
| } |
| |
| available = stripIncapableResources(available, framework.capabilities); |
| |
| // The resources we offer are the unreserved resources as well as the |
| // reserved resources for this particular role and all its ancestors |
| // in the role hierarchy. |
| // |
| // NOTE: Currently, frameworks are allowed to have '*' role. |
| // Calling reserved('*') returns an empty Resources object. |
| // |
| // TODO(mpark): Offer unreserved resources as revocable beyond quota. |
| Resources toAllocate = available.allocatableTo(role); |
| |
| // If allocating these resources would reduce the headroom |
| // below what is required, we will hold them back. |
| const Resources headroomToAllocate = toAllocate |
| .scalars().unreserved().nonRevocable(); |
| |
| bool sufficientHeadroom = |
| (availableHeadroom - |
| headroomToAllocate.createStrippedScalarQuantity()) |
| .contains(requiredHeadroom); |
| |
| if (!sufficientHeadroom) { |
| toAllocate -= headroomToAllocate; |
| heldBackForHeadroom += headroomToAllocate; |
| ++heldBackAgentCount; |
| } |
| |
| if (!allocatable(toAllocate) || |
| isFiltered(frameworkId, role, slaveId, toAllocate)) { |
| continue; |
| } |
| |
| VLOG(2) << "Allocating " << toAllocate << " on agent " << slaveId |
| << " to role " << role << " of framework " << frameworkId; |
| |
| toAllocate.allocate(role); |
| |
| // NOTE: We perform "coarse-grained" allocation, meaning that we always |
| // allocate the entire remaining slave resources to a single framework. |
| offerable[frameworkId][role][slaveId] += toAllocate; |
| offeredSharedResources[slaveId] += toAllocate.shared(); |
| |
| if (sufficientHeadroom) { |
| availableHeadroom -= |
| headroomToAllocate.createStrippedScalarQuantity(); |
| } |
| |
| slave.allocate(toAllocate); |
| |
| trackAllocatedResources(slaveId, frameworkId, toAllocate); |
| } |
| } |
| } |
| |
| if (!quotas.empty()) { |
| LOG(INFO) << "After allocation, " << requiredHeadroom |
| << " are required for quota headroom, " |
| << heldBackForHeadroom << " were held back from " |
| << heldBackAgentCount |
| << " agents to ensure sufficient quota headroom"; |
| } |
| |
| if (offerable.empty()) { |
| VLOG(2) << "No allocations performed"; |
| } else { |
| // Now offer the resources to each framework. |
| foreachkey (const FrameworkID& frameworkId, offerable) { |
| offerCallback(frameworkId, offerable.at(frameworkId)); |
| } |
| } |
| } |
| |
| |
| void HierarchicalAllocatorProcess::deallocate() |
| { |
| // If no frameworks are currently registered, no work to do. |
| if (roles.empty()) { |
| return; |
| } |
| CHECK(!frameworkSorters.empty()); |
| |
| // In this case, `offerable` is actually the slaves and/or resources that we |
| // want the master to create `InverseOffer`s from. |
| hashmap<FrameworkID, hashmap<SlaveID, UnavailableResources>> offerable; |
| |
| // For maintenance, we use the framework sorters to determine which frameworks |
| // have (1) reserved and / or (2) unreserved resource on the specified |
| // slaveIds. This way we only send inverse offers to frameworks that have the |
| // potential to lose something. We keep track of which frameworks already have |
| // an outstanding inverse offer for the given slave in the |
| // UnavailabilityStatus of the specific slave using the `offerOutstanding` |
| // flag. This is equivalent to the accounting we do for resources when we send |
| // regular offers. If we didn't keep track of outstanding offers then we would |
| // keep generating new inverse offers even though the framework had not |
| // responded yet. |
| |
| foreachvalue (const Owned<Sorter>& frameworkSorter, frameworkSorters) { |
| foreach (const SlaveID& slaveId, allocationCandidates) { |
| CHECK(slaves.contains(slaveId)); |
| |
| Slave& slave = slaves.at(slaveId); |
| |
| if (slave.maintenance.isSome()) { |
| // We use a reference by alias because we intend to modify the |
| // `maintenance` and to improve readability. |
| Slave::Maintenance& maintenance = slave.maintenance.get(); |
| |
| hashmap<string, Resources> allocation = |
| frameworkSorter->allocation(slaveId); |
| |
| foreachkey (const string& frameworkId_, allocation) { |
| FrameworkID frameworkId; |
| frameworkId.set_value(frameworkId_); |
| |
| CHECK(frameworks.contains(frameworkId)) << frameworkId; |
| |
| const Framework& framework = frameworks.at(frameworkId); |
| |
| // No need to deallocate for an inactive framework as the master |
| // will not send it inverse offers. |
| if (!framework.active) { |
| continue; |
| } |
| |
| // If this framework doesn't already have inverse offers for the |
| // specified slave. |
| if (!offerable[frameworkId].contains(slaveId)) { |
| // If there isn't already an outstanding inverse offer to this |
| // framework for the specified slave. |
| if (!maintenance.offersOutstanding.contains(frameworkId)) { |
| // Ignore in case the framework filters inverse offers for this |
| // slave. |
| // |
| // NOTE: Since this specific allocator implementation only sends |
| // inverse offers for maintenance primitives, and those are at the |
| // whole slave level, we only need to filter based on the |
| // time-out. |
| if (isFiltered(frameworkId, slaveId)) { |
| continue; |
| } |
| |
| const UnavailableResources unavailableResources = |
| UnavailableResources{ |
| Resources(), |
| maintenance.unavailability}; |
| |
| // For now we send inverse offers with empty resources when the |
| // inverse offer represents maintenance on the machine. In the |
| // future we could be more specific about the resources on the |
| // host, as we have the information available. |
| offerable[frameworkId][slaveId] = unavailableResources; |
| |
| // Mark this framework as having an offer outstanding for the |
| // specified slave. |
| maintenance.offersOutstanding.insert(frameworkId); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| if (offerable.empty()) { |
| VLOG(2) << "No inverse offers to send out!"; |
| } else { |
| // Now send inverse offers to each framework. |
| foreachkey (const FrameworkID& frameworkId, offerable) { |
| inverseOfferCallback(frameworkId, offerable[frameworkId]); |
| } |
| } |
| } |
| |
| |
| void HierarchicalAllocatorProcess::_expire( |
| const FrameworkID& frameworkId, |
| const string& role, |
| const SlaveID& slaveId, |
| const weak_ptr<OfferFilter>& offerFilter) |
| { |
| // The filter might have already been removed (e.g., if the |
| // framework no longer exists or in `reviveOffers()`) but |
| // we may land here if the cancelation of the expiry timeout |
| // did not succeed (due to the dispatch already being in the |
| // queue). |
| shared_ptr<OfferFilter> filter = offerFilter.lock(); |
| |
| if (filter.get() == nullptr) { |
| return; |
| } |
| |
| // Since this is a performance-sensitive piece of code, |
| // we use find to avoid the doing any redundant lookups. |
| auto frameworkIterator = frameworks.find(frameworkId); |
| CHECK(frameworkIterator != frameworks.end()); |
| |
| Framework& framework = frameworkIterator->second; |
| |
| auto roleFilters = framework.offerFilters.find(role); |
| CHECK(roleFilters != framework.offerFilters.end()); |
| |
| auto agentFilters = roleFilters->second.find(slaveId); |
| CHECK(agentFilters != roleFilters->second.end()); |
| |
| // Erase the filter (may be a no-op per the comment above). |
| agentFilters->second.erase(filter); |
| if (agentFilters->second.empty()) { |
| roleFilters->second.erase(slaveId); |
| } |
| if (roleFilters->second.empty()) { |
| framework.offerFilters.erase(role); |
| } |
| } |
| |
| |
| void HierarchicalAllocatorProcess::expire( |
| const FrameworkID& frameworkId, |
| const string& role, |
| const SlaveID& slaveId, |
| const weak_ptr<OfferFilter>& offerFilter) |
| { |
| dispatch( |
| self(), |
| &Self::_expire, |
| frameworkId, |
| role, |
| slaveId, |
| offerFilter); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::expire( |
| const FrameworkID& frameworkId, |
| const SlaveID& slaveId, |
| const weak_ptr<InverseOfferFilter>& inverseOfferFilter) |
| { |
| // The filter might have already been removed (e.g., if the |
| // framework no longer exists or in |
| // HierarchicalAllocatorProcess::reviveOffers) but |
| // we may land here if the cancelation of the expiry timeout |
| // did not succeed (due to the dispatch already being in the |
| // queue). |
| shared_ptr<InverseOfferFilter> filter = inverseOfferFilter.lock(); |
| |
| if (filter.get() == nullptr) { |
| return; |
| } |
| |
| // Since this is a performance-sensitive piece of code, |
| // we use find to avoid the doing any redundant lookups. |
| auto frameworkIterator = frameworks.find(frameworkId); |
| CHECK(frameworkIterator != frameworks.end()); |
| |
| Framework& framework = frameworkIterator->second; |
| |
| auto filters = framework.inverseOfferFilters.find(slaveId); |
| CHECK(filters != framework.inverseOfferFilters.end()); |
| |
| filters->second.erase(filter); |
| if (filters->second.empty()) { |
| framework.inverseOfferFilters.erase(slaveId); |
| } |
| } |
| |
| |
| bool HierarchicalAllocatorProcess::isWhitelisted( |
| const SlaveID& slaveId) const |
| { |
| CHECK(slaves.contains(slaveId)); |
| |
| const Slave& slave = slaves.at(slaveId); |
| |
| return whitelist.isNone() || whitelist->contains(slave.info.hostname()); |
| } |
| |
| |
| bool HierarchicalAllocatorProcess::isFiltered( |
| const FrameworkID& frameworkId, |
| const string& role, |
| const SlaveID& slaveId, |
| const Resources& resources) const |
| { |
| CHECK(frameworks.contains(frameworkId)); |
| CHECK(slaves.contains(slaveId)); |
| |
| const Framework& framework = frameworks.at(frameworkId); |
| const Slave& slave = slaves.at(slaveId); |
| |
| // TODO(mpark): Consider moving these filter logic out and into the master, |
| // since they are not specific to the hierarchical allocator but rather are |
| // global allocation constraints. |
| |
| // Prevent offers from non-MULTI_ROLE agents to be allocated |
| // to MULTI_ROLE frameworks. |
| if (framework.capabilities.multiRole && |
| !slave.capabilities.multiRole) { |
| LOG(WARNING) << "Implicitly filtering agent " << slaveId |
| << " from framework " << frameworkId |
| << " because the framework is MULTI_ROLE capable" |
| << " but the agent is not"; |
| |
| return true; |
| } |
| |
| // Prevent offers from non-HIERARCHICAL_ROLE agents to be allocated |
| // to hierarchical roles. |
| if (!slave.capabilities.hierarchicalRole && strings::contains(role, "/")) { |
| LOG(WARNING) << "Implicitly filtering agent " << slaveId |
| << " from role " << role |
| << " because the role is hierarchical but the agent is not" |
| << " HIERARCHICAL_ROLE capable"; |
| |
| return true; |
| } |
| |
| // Since this is a performance-sensitive piece of code, |
| // we use find to avoid the doing any redundant lookups. |
| auto roleFilters = framework.offerFilters.find(role); |
| if (roleFilters == framework.offerFilters.end()) { |
| return false; |
| } |
| |
| auto agentFilters = roleFilters->second.find(slaveId); |
| if (agentFilters == roleFilters->second.end()) { |
| return false; |
| } |
| |
| foreach (const shared_ptr<OfferFilter>& offerFilter, agentFilters->second) { |
| if (offerFilter->filter(resources)) { |
| VLOG(1) << "Filtered offer with " << resources |
| << " on agent " << slaveId |
| << " for role " << role |
| << " of framework " << frameworkId; |
| |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| |
| bool HierarchicalAllocatorProcess::isFiltered( |
| const FrameworkID& frameworkId, |
| const SlaveID& slaveId) const |
| { |
| CHECK(frameworks.contains(frameworkId)); |
| CHECK(slaves.contains(slaveId)); |
| |
| const Framework& framework = frameworks.at(frameworkId); |
| |
| if (framework.inverseOfferFilters.contains(slaveId)) { |
| foreach (const shared_ptr<InverseOfferFilter>& inverseOfferFilter, |
| framework.inverseOfferFilters.at(slaveId)) { |
| if (inverseOfferFilter->filter()) { |
| VLOG(1) << "Filtered unavailability on agent " << slaveId |
| << " for framework " << frameworkId; |
| |
| return true; |
| } |
| } |
| } |
| |
| return false; |
| } |
| |
| |
| bool HierarchicalAllocatorProcess::allocatable(const Resources& resources) |
| { |
| if (minAllocatableResources.isNone() || |
| CHECK_NOTNONE(minAllocatableResources).empty()) { |
| return true; |
| } |
| |
| foreach ( |
| const ResourceQuantities& resourceQuantities, |
| CHECK_NOTNONE(minAllocatableResources)) { |
| if (resources.contains(resourceQuantities)) { |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| |
| double HierarchicalAllocatorProcess::_resources_offered_or_allocated( |
| const string& resource) |
| { |
| double offered_or_allocated = 0; |
| |
| foreachvalue (const Slave& slave, slaves) { |
| Option<Value::Scalar> value = |
| slave.getAllocated().get<Value::Scalar>(resource); |
| |
| if (value.isSome()) { |
| offered_or_allocated += value->value(); |
| } |
| } |
| |
| return offered_or_allocated; |
| } |
| |
| |
| double HierarchicalAllocatorProcess::_resources_total( |
| const string& resource) |
| { |
| Option<Value::Scalar> total = |
| totalStrippedScalars.get<Value::Scalar>(resource); |
| |
| return total.isSome() ? total->value() : 0; |
| } |
| |
| |
| double HierarchicalAllocatorProcess::_quota_allocated( |
| const string& role, |
| const string& resource) |
| { |
| if (!roleSorter->contains(role)) { |
| // This can occur when execution of this callback races with removal of the |
| // metric for a role which does not have any associated frameworks. |
| return 0.; |
| } |
| |
| Option<Value::Scalar> used = |
| roleSorter->allocationScalarQuantities(role) |
| .get<Value::Scalar>(resource); |
| |
| return used.isSome() ? used->value() : 0; |
| } |
| |
| |
| double HierarchicalAllocatorProcess::_offer_filters_active( |
| const string& role) |
| { |
| double result = 0; |
| |
| foreachvalue (const Framework& framework, frameworks) { |
| if (!framework.offerFilters.contains(role)) { |
| continue; |
| } |
| |
| foreachkey (const SlaveID& slaveId, framework.offerFilters.at(role)) { |
| result += framework.offerFilters.at(role).at(slaveId).size(); |
| } |
| } |
| |
| return result; |
| } |
| |
| |
| bool HierarchicalAllocatorProcess::isFrameworkTrackedUnderRole( |
| const FrameworkID& frameworkId, |
| const string& role) const |
| { |
| return roles.contains(role) && |
| roles.at(role).contains(frameworkId); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::trackFrameworkUnderRole( |
| const FrameworkID& frameworkId, |
| const string& role) |
| { |
| CHECK(initialized); |
| |
| // If this is the first framework to subscribe to this role, or have |
| // resources allocated to this role, initialize state as necessary. |
| if (!roles.contains(role)) { |
| roles[role] = {}; |
| CHECK(!roleSorter->contains(role)); |
| roleSorter->add(role); |
| roleSorter->activate(role); |
| |
| CHECK(!frameworkSorters.contains(role)); |
| frameworkSorters.insert({role, Owned<Sorter>(frameworkSorterFactory())}); |
| frameworkSorters.at(role)->initialize(fairnessExcludeResourceNames); |
| |
| foreachvalue (const Slave& slave, slaves) { |
| frameworkSorters.at(role)->addSlave( |
| slave.info.id(), |
| ResourceQuantities::fromScalarResources(slave.getTotal().scalars())); |
| } |
| |
| metrics.addRole(role); |
| } |
| |
| CHECK(!roles.at(role).contains(frameworkId)); |
| roles.at(role).insert(frameworkId); |
| |
| CHECK(!frameworkSorters.at(role)->contains(frameworkId.value())); |
| frameworkSorters.at(role)->add(frameworkId.value()); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::untrackFrameworkUnderRole( |
| const FrameworkID& frameworkId, |
| const string& role) |
| { |
| CHECK(initialized); |
| |
| CHECK(roles.contains(role)); |
| CHECK(roles.at(role).contains(frameworkId)); |
| CHECK(frameworkSorters.contains(role)); |
| CHECK(frameworkSorters.at(role)->contains(frameworkId.value())); |
| |
| roles.at(role).erase(frameworkId); |
| frameworkSorters.at(role)->remove(frameworkId.value()); |
| |
| // If no more frameworks are subscribed to this role or have resources |
| // allocated to this role, cleanup associated state. This is not necessary |
| // for correctness (roles with no registered frameworks will not be offered |
| // any resources), but since many different role names might be used over |
| // time, we want to avoid leaking resources for no-longer-used role names. |
| // Note that we don't remove the role from `quotaRoleSorter` if it exists |
| // there, since roles with a quota set still influence allocation even if |
| // they don't have any registered frameworks. |
| |
| if (roles.at(role).empty()) { |
| CHECK_EQ(frameworkSorters.at(role)->count(), 0u); |
| |
| roles.erase(role); |
| roleSorter->remove(role); |
| |
| frameworkSorters.erase(role); |
| |
| metrics.removeRole(role); |
| } |
| } |
| |
| |
| void HierarchicalAllocatorProcess::trackReservations( |
| const hashmap<std::string, Resources>& reservations) |
| { |
| foreachpair (const string& role, |
| const Resources& resources, reservations) { |
| const Resources scalarQuantitesToTrack = |
| resources.createStrippedScalarQuantity(); |
| |
| if (scalarQuantitesToTrack.empty()) { |
| continue; // Do not insert an empty entry. |
| } |
| |
| reservationScalarQuantities[role] += scalarQuantitesToTrack; |
| } |
| } |
| |
| |
| void HierarchicalAllocatorProcess::untrackReservations( |
| const hashmap<std::string, Resources>& reservations) |
| { |
| foreachpair (const string& role, |
| const Resources& resources, reservations) { |
| const Resources scalarQuantitesToUntrack = |
| resources.createStrippedScalarQuantity(); |
| |
| if (scalarQuantitesToUntrack.empty()) { |
| continue; // Do not CHECK for the role if there's nothing to untrack. |
| } |
| |
| CHECK(reservationScalarQuantities.contains(role)); |
| Resources& currentReservationQuantity = |
| reservationScalarQuantities.at(role); |
| |
| CHECK(currentReservationQuantity.contains(scalarQuantitesToUntrack)); |
| currentReservationQuantity -= scalarQuantitesToUntrack; |
| |
| if (currentReservationQuantity.empty()) { |
| reservationScalarQuantities.erase(role); |
| } |
| } |
| } |
| |
| |
| bool HierarchicalAllocatorProcess::updateSlaveTotal( |
| const SlaveID& slaveId, |
| const Resources& total) |
| { |
| CHECK(slaves.contains(slaveId)); |
| |
| Slave& slave = slaves.at(slaveId); |
| |
| const Resources oldTotal = slave.getTotal(); |
| |
| if (oldTotal == total) { |
| return false; |
| } |
| |
| slave.updateTotal(total); |
| |
| hashmap<std::string, Resources> oldReservations = oldTotal.reservations(); |
| hashmap<std::string, Resources> newReservations = total.reservations(); |
| |
| if (oldReservations != newReservations) { |
| untrackReservations(oldReservations); |
| trackReservations(newReservations); |
| } |
| |
| // Update the total in the allocator and totals in the sorters. |
| const Resources oldStrippedScalars = oldTotal.createStrippedScalarQuantity(); |
| const Resources strippedScalars = total.createStrippedScalarQuantity(); |
| |
| const ResourceQuantities oldAgentScalarQuantities = |
| ResourceQuantities::fromScalarResources(oldStrippedScalars); |
| |
| const ResourceQuantities agentScalarQuantities = |
| ResourceQuantities::fromScalarResources(strippedScalars); |
| |
| CHECK(totalStrippedScalars.contains(oldStrippedScalars)); |
| totalStrippedScalars -= oldStrippedScalars; |
| totalStrippedScalars += strippedScalars; |
| |
| roleSorter->removeSlave(slaveId); |
| roleSorter->addSlave(slaveId, agentScalarQuantities); |
| |
| foreachvalue (const Owned<Sorter>& sorter, frameworkSorters) { |
| sorter->removeSlave(slaveId); |
| sorter->addSlave(slaveId, agentScalarQuantities); |
| } |
| |
| // See comment at `quotaRoleSorter` declaration regarding non-revocable. |
| quotaRoleSorter->removeSlave(slaveId); |
| quotaRoleSorter->addSlave( |
| slaveId, |
| ResourceQuantities::fromScalarResources(total.nonRevocable().scalars())); |
| |
| return true; |
| } |
| |
| |
| bool HierarchicalAllocatorProcess::isRemoteSlave(const Slave& slave) const |
| { |
| // If the slave does not have a configured domain, assume it is not remote. |
| if (!slave.info.has_domain()) { |
| return false; |
| } |
| |
| // The current version of the Mesos agent refuses to startup if a |
| // domain is specified without also including a fault domain. That |
| // might change in the future, if more types of domains are added. |
| // For forward compatibility, we treat agents with a configured |
| // domain but no fault domain as having no configured domain. |
| if (!slave.info.domain().has_fault_domain()) { |
| return false; |
| } |
| |
| // If the slave has a configured domain (and it has been allowed to |
| // register with the master), the master must also have a configured |
| // domain. |
| CHECK(domain.isSome()); |
| |
| // The master will not startup if configured with a domain but no |
| // fault domain. |
| CHECK(domain->has_fault_domain()); |
| |
| const DomainInfo::FaultDomain::RegionInfo& masterRegion = |
| domain->fault_domain().region(); |
| const DomainInfo::FaultDomain::RegionInfo& slaveRegion = |
| slave.info.domain().fault_domain().region(); |
| |
| return masterRegion != slaveRegion; |
| } |
| |
| |
| bool HierarchicalAllocatorProcess::isCapableOfReceivingAgent( |
| const protobuf::framework::Capabilities& frameworkCapabilities, |
| const Slave& slave) const |
| { |
| // Only offer resources from slaves that have GPUs to |
| // frameworks that are capable of receiving GPUs. |
| // See MESOS-5634. |
| if (filterGpuResources && !frameworkCapabilities.gpuResources && |
| slave.getTotal().gpus().getOrElse(0) > 0) { |
| return false; |
| } |
| |
| // If this framework is not region-aware, don't offer it |
| // resources on agents in remote regions. |
| if (!frameworkCapabilities.regionAware && isRemoteSlave(slave)) { |
| return false; |
| } |
| |
| return true; |
| } |
| |
| |
| Resources HierarchicalAllocatorProcess::stripIncapableResources( |
| const Resources& resources, |
| const protobuf::framework::Capabilities& frameworkCapabilities) const |
| { |
| return resources.filter([&](const Resource& resource) { |
| if (!frameworkCapabilities.sharedResources && |
| Resources::isShared(resource)) { |
| return false; |
| } |
| |
| if (!frameworkCapabilities.revocableResources && |
| Resources::isRevocable(resource)) { |
| return false; |
| } |
| |
| // When reservation refinements are present, old frameworks without the |
| // RESERVATION_REFINEMENT capability won't be able to understand the |
| // new format. While it's possible to translate the refined reservations |
| // into the old format by "hiding" the intermediate reservations in the |
| // "stack", this leads to ambiguity when processing RESERVE / UNRESERVE |
| // operations. This is due to the loss of information when we drop the |
| // intermediate reservations. Therefore, for now we simply filter out |
| // resources with refined reservations if the framework does not have |
| // the capability. |
| if (!frameworkCapabilities.reservationRefinement && |
| Resources::hasRefinedReservations(resource)) { |
| return false; |
| } |
| |
| return true; |
| }); |
| } |
| |
| |
| void HierarchicalAllocatorProcess::trackAllocatedResources( |
| const SlaveID& slaveId, |
| const FrameworkID& frameworkId, |
| const Resources& allocated) |
| { |
| CHECK(slaves.contains(slaveId)); |
| CHECK(frameworks.contains(frameworkId)); |
| |
| // TODO(bmahler): Calling allocations() is expensive since it has |
| // to construct a map. Avoid this. |
| foreachpair (const string& role, |
| const Resources& allocation, |
| allocated.allocations()) { |
| // The framework has resources allocated to this role but it may |
| // or may not be subscribed to the role. Either way, we need to |
| // track the framework under the role. |
| if (!isFrameworkTrackedUnderRole(frameworkId, role)) { |
| trackFrameworkUnderRole(frameworkId, role); |
| } |
| |
| CHECK(roleSorter->contains(role)); |
| CHECK(frameworkSorters.contains(role)); |
| CHECK(frameworkSorters.at(role)->contains(frameworkId.value())); |
| |
| roleSorter->allocated(role, slaveId, allocation); |
| frameworkSorters.at(role)->allocated( |
| frameworkId.value(), slaveId, allocation); |
| |
| if (quotas.contains(role)) { |
| // See comment at `quotaRoleSorter` declaration regarding non-revocable. |
| quotaRoleSorter->allocated(role, slaveId, allocation.nonRevocable()); |
| } |
| } |
| } |
| |
| |
| void HierarchicalAllocatorProcess::untrackAllocatedResources( |
| const SlaveID& slaveId, |
| const FrameworkID& frameworkId, |
| const Resources& allocated) |
| { |
| // TODO(mzhu): Add a `CHECK(slaves.contains(slaveId));` |
| // here once MESOS-621 is resolved. Ideally, `removeSlave()` |
| // should unallocate resources in the framework sorters. |
| // But currently, a slave is removed first via `removeSlave()` |
| // and later a call to `recoverResources()` occurs to recover |
| // the framework's resources. |
| CHECK(frameworks.contains(frameworkId)); |
| |
| // TODO(bmahler): Calling allocations() is expensive since it has |
| // to construct a map. Avoid this. |
| foreachpair (const string& role, |
| const Resources& allocation, |
| allocated.allocations()) { |
| CHECK(roleSorter->contains(role)); |
| CHECK(frameworkSorters.contains(role)); |
| CHECK(frameworkSorters.at(role)->contains(frameworkId.value())); |
| |
| frameworkSorters.at(role)->unallocated( |
| frameworkId.value(), slaveId, allocation); |
| |
| roleSorter->unallocated(role, slaveId, allocation); |
| |
| if (quotas.contains(role)) { |
| // See comment at `quotaRoleSorter` declaration regarding non-revocable. |
| quotaRoleSorter->unallocated(role, slaveId, allocation.nonRevocable()); |
| } |
| } |
| } |
| |
| |
| } // namespace internal { |
| } // namespace allocator { |
| } // namespace master { |
| } // namespace internal { |
| } // namespace mesos { |