blob: 931255caf39696172db031bb9d5f47223bb2967c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "master/allocator/mesos/hierarchical.hpp"
#include <algorithm>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include <mesos/resources.hpp>
#include <mesos/type_utils.hpp>
#include <process/after.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
#include <process/event.hpp>
#include <process/id.hpp>
#include <process/loop.hpp>
#include <process/timeout.hpp>
#include <stout/check.hpp>
#include <stout/hashset.hpp>
#include <stout/stopwatch.hpp>
#include <stout/stringify.hpp>
#include "common/protobuf_utils.hpp"
#include "common/resource_quantities.hpp"
using std::set;
using std::string;
using std::vector;
using mesos::allocator::InverseOfferStatus;
using process::after;
using process::Continue;
using process::ControlFlow;
using process::Failure;
using process::Future;
using process::loop;
using process::Owned;
using process::PID;
using process::Timeout;
using mesos::internal::protobuf::framework::Capabilities;
namespace mesos {
namespace internal {
namespace master {
namespace allocator {
namespace internal {
// Used to represent "filters" for resources unused in offers.
class OfferFilter
{
public:
virtual ~OfferFilter() {}
virtual bool filter(const Resources& resources) const = 0;
};
class RefusedOfferFilter : public OfferFilter
{
public:
RefusedOfferFilter(const Resources& _resources) : resources(_resources) {}
virtual bool filter(const Resources& _resources) const
{
// TODO(jieyu): Consider separating the superset check for regular
// and revocable resources. For example, frameworks might want
// more revocable resources only or non-revocable resources only,
// but currently the filter only expires if there is more of both
// revocable and non-revocable resources.
return resources.contains(_resources); // Refused resources are superset.
}
private:
const Resources resources;
};
// Used to represent "filters" for inverse offers.
//
// NOTE: Since this specific allocator implementation only sends inverse offers
// for maintenance primitives, and those are at the whole slave level, we only
// need to filter based on the time-out.
// If this allocator implementation starts sending out more resource specific
// inverse offers, then we can capture the `unavailableResources` in the filter
// function.
class InverseOfferFilter
{
public:
virtual ~InverseOfferFilter() {}
virtual bool filter() const = 0;
};
// NOTE: See comment above `InverseOfferFilter` regarding capturing
// `unavailableResources` if this allocator starts sending fine-grained inverse
// offers.
class RefusedInverseOfferFilter : public InverseOfferFilter
{
public:
RefusedInverseOfferFilter(const Timeout& _timeout)
: timeout(_timeout) {}
virtual bool filter() const
{
// See comment above why we currently don't do more fine-grained filtering.
return timeout.remaining() > Seconds(0);
}
private:
const Timeout timeout;
};
HierarchicalAllocatorProcess::Framework::Framework(
const FrameworkInfo& frameworkInfo,
const set<string>& _suppressedRoles)
: roles(protobuf::framework::getRoles(frameworkInfo)),
suppressedRoles(_suppressedRoles),
capabilities(frameworkInfo.capabilities()) {}
void HierarchicalAllocatorProcess::initialize(
const Duration& _allocationInterval,
const lambda::function<
void(const FrameworkID&,
const hashmap<string, hashmap<SlaveID, Resources>>&)>&
_offerCallback,
const lambda::function<
void(const FrameworkID&,
const hashmap<SlaveID, UnavailableResources>&)>&
_inverseOfferCallback,
const Option<set<string>>& _fairnessExcludeResourceNames,
bool _filterGpuResources,
const Option<DomainInfo>& _domain,
const Option<std::vector<mesos::internal::ResourceQuantities>>&
_minAllocatableResources)
{
allocationInterval = _allocationInterval;
offerCallback = _offerCallback;
inverseOfferCallback = _inverseOfferCallback;
fairnessExcludeResourceNames = _fairnessExcludeResourceNames;
filterGpuResources = _filterGpuResources;
domain = _domain;
minAllocatableResources = _minAllocatableResources;
initialized = true;
paused = false;
// Resources for quota'ed roles are allocated separately and prior to
// non-quota'ed roles, hence a dedicated sorter for quota'ed roles is
// necessary.
roleSorter->initialize(fairnessExcludeResourceNames);
quotaRoleSorter->initialize(fairnessExcludeResourceNames);
VLOG(1) << "Initialized hierarchical allocator process";
// Start a loop to run allocation periodically.
PID<HierarchicalAllocatorProcess> _self = self();
loop(
None(), // Use `None` so we iterate outside the allocator process.
[_allocationInterval]() {
return after(_allocationInterval);
},
[_self](const Nothing&) {
return dispatch(_self, &HierarchicalAllocatorProcess::allocate)
.then([]() -> ControlFlow<Nothing> { return Continue(); });
});
}
void HierarchicalAllocatorProcess::recover(
const int _expectedAgentCount,
const hashmap<string, Quota>& quotas)
{
// Recovery should start before actual allocation starts.
CHECK(initialized);
CHECK(slaves.empty());
CHECK_EQ(0, quotaRoleSorter->count());
CHECK(_expectedAgentCount >= 0);
// If there is no quota, recovery is a no-op. Otherwise, we need
// to delay allocations while agents are re-registering because
// otherwise we perform allocations on a partial view of resources!
// We would consequently perform unnecessary allocations to satisfy
// quota constraints, which can over-allocate non-revocable resources
// to roles using quota. Then, frameworks in roles without quota can
// be unnecessarily deprived of resources. We may also be unable to
// satisfy all of the quota constraints. Repeated master failovers
// exacerbate the issue.
if (quotas.empty()) {
VLOG(1) << "Skipping recovery of hierarchical allocator: "
<< "nothing to recover";
return;
}
// NOTE: `quotaRoleSorter` is updated implicitly in `setQuota()`.
foreachpair (const string& role, const Quota& quota, quotas) {
setQuota(role, quota);
}
// TODO(alexr): Consider exposing these constants.
const Duration ALLOCATION_HOLD_OFF_RECOVERY_TIMEOUT = Minutes(10);
const double AGENT_RECOVERY_FACTOR = 0.8;
// Record the number of expected agents.
expectedAgentCount =
static_cast<int>(_expectedAgentCount * AGENT_RECOVERY_FACTOR);
// Skip recovery if there are no expected agents. This is not strictly
// necessary for the allocator to function correctly, but maps better
// to expected behavior by the user: the allocator is not paused until
// a new agent is added.
if (expectedAgentCount.get() == 0) {
VLOG(1) << "Skipping recovery of hierarchical allocator: "
<< "no reconnecting agents to wait for";
return;
}
// Pause allocation until after a sufficient amount of agents reregister
// or a timer expires.
pause();
// Setup recovery timer.
delay(ALLOCATION_HOLD_OFF_RECOVERY_TIMEOUT, self(), &Self::resume);
LOG(INFO) << "Triggered allocator recovery: waiting for "
<< expectedAgentCount.get() << " agents to reconnect or "
<< ALLOCATION_HOLD_OFF_RECOVERY_TIMEOUT << " to pass";
}
void HierarchicalAllocatorProcess::addFramework(
const FrameworkID& frameworkId,
const FrameworkInfo& frameworkInfo,
const hashmap<SlaveID, Resources>& used,
bool active,
const set<string>& suppressedRoles)
{
CHECK(initialized);
CHECK(!frameworks.contains(frameworkId));
frameworks.insert({frameworkId, Framework(frameworkInfo, suppressedRoles)});
const Framework& framework = frameworks.at(frameworkId);
foreach (const string& role, framework.roles) {
trackFrameworkUnderRole(frameworkId, role);
CHECK(frameworkSorters.contains(role));
if (suppressedRoles.count(role)) {
frameworkSorters.at(role)->deactivate(frameworkId.value());
} else {
frameworkSorters.at(role)->activate(frameworkId.value());
}
}
// TODO(bmahler): Validate that the reserved resources have the
// framework's role.
// Update the allocation for this framework.
foreachpair (const SlaveID& slaveId, const Resources& resources, used) {
// TODO(bmahler): The master won't tell us about resources
// allocated to agents that have not yet been added, consider
// CHECKing this case.
if (!slaves.contains(slaveId)) {
continue;
}
// The slave struct will already be aware of the allocated
// resources, so we only need to track them in the sorters.
trackAllocatedResources(slaveId, frameworkId, resources);
}
LOG(INFO) << "Added framework " << frameworkId;
if (active) {
allocate();
} else {
deactivateFramework(frameworkId);
}
}
void HierarchicalAllocatorProcess::removeFramework(
const FrameworkID& frameworkId)
{
CHECK(initialized);
CHECK(frameworks.contains(frameworkId));
const Framework& framework = frameworks.at(frameworkId);
foreach (const string& role, framework.roles) {
// Might not be in 'frameworkSorters[role]' because it
// was previously deactivated and never re-added.
//
// TODO(mzhu): This check may no longer be necessary.
if (!frameworkSorters.contains(role) ||
!frameworkSorters.at(role)->contains(frameworkId.value())) {
continue;
}
hashmap<SlaveID, Resources> allocation =
frameworkSorters.at(role)->allocation(frameworkId.value());
// Update the allocation for this framework.
foreachpair (const SlaveID& slaveId,
const Resources& allocated,
allocation) {
untrackAllocatedResources(slaveId, frameworkId, allocated);
}
untrackFrameworkUnderRole(frameworkId, role);
}
// Do not delete the filters contained in this
// framework's `offerFilters` hashset yet, see comments in
// HierarchicalAllocatorProcess::reviveOffers and
// HierarchicalAllocatorProcess::expire.
frameworks.erase(frameworkId);
LOG(INFO) << "Removed framework " << frameworkId;
}
void HierarchicalAllocatorProcess::activateFramework(
const FrameworkID& frameworkId)
{
CHECK(initialized);
CHECK(frameworks.contains(frameworkId));
const Framework& framework = frameworks.at(frameworkId);
// Activate all roles for this framework except the roles that
// are marked as deactivated.
// Note: A subset of framework roles can be deactivated if the
// role is specified in `suppressed_roles` during framework
// (re)registration, or via a subsequent `SUPPRESS` call.
foreach (const string& role, framework.roles) {
CHECK(frameworkSorters.contains(role));
if (!framework.suppressedRoles.count(role)) {
frameworkSorters.at(role)->activate(frameworkId.value());
}
}
LOG(INFO) << "Activated framework " << frameworkId;
allocate();
}
void HierarchicalAllocatorProcess::deactivateFramework(
const FrameworkID& frameworkId)
{
CHECK(initialized);
CHECK(frameworks.contains(frameworkId));
Framework& framework = frameworks.at(frameworkId);
foreach (const string& role, framework.roles) {
CHECK(frameworkSorters.contains(role));
frameworkSorters.at(role)->deactivate(frameworkId.value());
// Note that the Sorter *does not* remove the resources allocated
// to this framework. For now, this is important because if the
// framework fails over and is activated, we still want a record
// of the resources that it is using. We might be able to collapse
// the added/removed and activated/deactivated in the future.
}
// Do not delete the filters contained in this
// framework's `offerFilters` hashset yet, see comments in
// HierarchicalAllocatorProcess::reviveOffers and
// HierarchicalAllocatorProcess::expire.
framework.offerFilters.clear();
framework.inverseOfferFilters.clear();
LOG(INFO) << "Deactivated framework " << frameworkId;
}
void HierarchicalAllocatorProcess::updateFramework(
const FrameworkID& frameworkId,
const FrameworkInfo& frameworkInfo,
const set<string>& suppressedRoles)
{
CHECK(initialized);
CHECK(frameworks.contains(frameworkId));
Framework& framework = frameworks.at(frameworkId);
set<string> oldRoles = framework.roles;
set<string> newRoles = protobuf::framework::getRoles(frameworkInfo);
set<string> oldSuppressedRoles = framework.suppressedRoles;
// The roles which are candidates for deactivation are the roles that are
// removed, as well as the roles which have moved from non-suppressed
// to suppressed mode.
const set<string> rolesToDeactivate = [&]() {
set<string> result = oldRoles;
foreach (const string& role, newRoles) {
result.erase(role);
}
foreach (const string& role, oldRoles) {
if (!oldSuppressedRoles.count(role) && suppressedRoles.count(role)) {
result.insert(role);
}
}
return result;
}();
foreach (const string& role, rolesToDeactivate) {
CHECK(frameworkSorters.contains(role));
frameworkSorters.at(role)->deactivate(frameworkId.value());
// Stop tracking the framework under this role if there are
// no longer any resources allocated to it.
if (frameworkSorters.at(role)->allocation(frameworkId.value()).empty()) {
untrackFrameworkUnderRole(frameworkId, role);
}
if (framework.offerFilters.contains(role)) {
framework.offerFilters.erase(role);
}
}
// The roles which are candidates for activation are the roles that are
// added, as well as the roles which have moved from suppressed to
// non-suppressed mode.
//
// TODO(anindya_sinha): We should activate the roles only if the
// framework is active (instead of always).
const set<string> rolesToActivate = [&]() {
set<string> result = newRoles;
foreach (const string& role, oldRoles) {
result.erase(role);
}
foreach (const string& role, newRoles) {
if (!suppressedRoles.count(role) && oldSuppressedRoles.count(role)) {
result.insert(role);
} else if (suppressedRoles.count(role)) {
result.erase(role);
}
}
return result;
}();
foreach (const string& role, rolesToActivate) {
// NOTE: It's possible that we're already tracking this framework
// under the role because a framework can unsubscribe from a role
// while it still has resources allocated to the role.
if (!isFrameworkTrackedUnderRole(frameworkId, role)) {
trackFrameworkUnderRole(frameworkId, role);
}
CHECK(frameworkSorters.contains(role));
frameworkSorters.at(role)->activate(frameworkId.value());
}
framework.roles = newRoles;
framework.suppressedRoles = suppressedRoles;
framework.capabilities = frameworkInfo.capabilities();
}
void HierarchicalAllocatorProcess::addSlave(
const SlaveID& slaveId,
const SlaveInfo& slaveInfo,
const vector<SlaveInfo::Capability>& capabilities,
const Option<Unavailability>& unavailability,
const Resources& total,
const hashmap<FrameworkID, Resources>& used)
{
CHECK(initialized);
CHECK(!slaves.contains(slaveId));
CHECK(!paused || expectedAgentCount.isSome());
slaves.insert({slaveId,
Slave(
slaveInfo.hostname(),
protobuf::slave::Capabilities(capabilities),
true,
total,
Resources::sum(used))});
Slave& slave = slaves.at(slaveId);
if (slaveInfo.has_domain()) {
slave.domain = slaveInfo.domain();
}
// NOTE: We currently implement maintenance in the allocator to be able to
// leverage state and features such as the FrameworkSorter and OfferFilter.
if (unavailability.isSome()) {
slave.maintenance = Slave::Maintenance(unavailability.get());
}
trackReservations(total.reservations());
roleSorter->add(slaveId, total);
// See comment at `quotaRoleSorter` declaration regarding non-revocable.
quotaRoleSorter->add(slaveId, total.nonRevocable());
foreachpair (const FrameworkID& frameworkId,
const Resources& allocation,
used) {
// There are two cases here:
//
// (1) The framework has already been added to the allocator.
// In this case, we track the allocation in the sorters.
//
// (2) The framework has not yet been added to the allocator.
// The master will imminently add the framework using
// the `FrameworkInfo` recovered from the agent, and in
// the interim we do not track the resources allocated to
// this framework. This leaves a small window where the
// role sorting will under-account for the roles belonging
// to this framework.
//
// TODO(bmahler): Fix the issue outlined in (2).
if (!frameworks.contains(frameworkId)) {
continue;
}
trackAllocatedResources(slaveId, frameworkId, allocation);
}
// If we have just a number of recovered agents, we cannot distinguish
// between "old" agents from the registry and "new" ones joined after
// recovery has started. Because we do not persist enough information
// to base logical decisions on, any accounting algorithm here will be
// crude. Hence we opted for checking whether a certain amount of cluster
// capacity is back online, so that we are reasonably confident that we
// will not over-commit too many resources to quota that we will not be
// able to revoke.
if (paused &&
expectedAgentCount.isSome() &&
(static_cast<int>(slaves.size()) >= expectedAgentCount.get())) {
VLOG(1) << "Recovery complete: sufficient amount of agents added; "
<< slaves.size() << " agents known to the allocator";
expectedAgentCount = None();
resume();
}
LOG(INFO) << "Added agent " << slaveId << " (" << slave.hostname << ")"
<< " with " << slave.getTotal()
<< " (allocated: " << slave.getAllocated() << ")";
allocate(slaveId);
}
void HierarchicalAllocatorProcess::removeSlave(
const SlaveID& slaveId)
{
CHECK(initialized);
CHECK(slaves.contains(slaveId));
// TODO(bmahler): Per MESOS-621, this should remove the allocations
// that any frameworks have on this slave. Otherwise the caller may
// "leak" allocated resources accidentally if they forget to recover
// all the resources. Fixing this would require more information
// than what we currently track in the allocator.
roleSorter->remove(slaveId, slaves.at(slaveId).getTotal());
// See comment at `quotaRoleSorter` declaration regarding non-revocable.
quotaRoleSorter->remove(
slaveId, slaves.at(slaveId).getTotal().nonRevocable());
untrackReservations(slaves.at(slaveId).getTotal().reservations());
slaves.erase(slaveId);
allocationCandidates.erase(slaveId);
// Note that we DO NOT actually delete any filters associated with
// this slave, that will occur when the delayed
// HierarchicalAllocatorProcess::expire gets invoked (or the framework
// that applied the filters gets removed).
LOG(INFO) << "Removed agent " << slaveId;
}
void HierarchicalAllocatorProcess::updateSlave(
const SlaveID& slaveId,
const Option<Resources>& total,
const Option<vector<SlaveInfo::Capability>>& capabilities)
{
CHECK(initialized);
CHECK(slaves.contains(slaveId));
Slave& slave = slaves.at(slaveId);
bool updated = false;
// Update agent capabilities.
if (capabilities.isSome()) {
protobuf::slave::Capabilities newCapabilities(capabilities.get());
protobuf::slave::Capabilities oldCapabilities(slave.capabilities);
slave.capabilities = newCapabilities;
if (newCapabilities != oldCapabilities) {
updated = true;
LOG(INFO) << "Agent " << slaveId << " (" << slave.hostname << ")"
<< " updated with capabilities " << slave.capabilities;
}
}
if (total.isSome()) {
updated = updateSlaveTotal(slaveId, total.get());
LOG(INFO) << "Agent " << slaveId << " (" << slave.hostname << ")"
<< " updated with total resources " << total.get();
}
if (updated) {
allocate(slaveId);
}
}
void HierarchicalAllocatorProcess::activateSlave(
const SlaveID& slaveId)
{
CHECK(initialized);
CHECK(slaves.contains(slaveId));
slaves.at(slaveId).activated = true;
LOG(INFO) << "Agent " << slaveId << " reactivated";
}
void HierarchicalAllocatorProcess::deactivateSlave(
const SlaveID& slaveId)
{
CHECK(initialized);
CHECK(slaves.contains(slaveId));
slaves.at(slaveId).activated = false;
LOG(INFO) << "Agent " << slaveId << " deactivated";
}
void HierarchicalAllocatorProcess::updateWhitelist(
const Option<hashset<string>>& _whitelist)
{
CHECK(initialized);
whitelist = _whitelist;
if (whitelist.isSome()) {
LOG(INFO) << "Updated agent whitelist: " << stringify(whitelist.get());
if (whitelist.get().empty()) {
LOG(WARNING) << "Whitelist is empty, no offers will be made!";
}
} else {
LOG(INFO) << "Advertising offers for all agents";
}
}
void HierarchicalAllocatorProcess::requestResources(
const FrameworkID& frameworkId,
const vector<Request>& requests)
{
CHECK(initialized);
LOG(INFO) << "Received resource request from framework " << frameworkId;
}
void HierarchicalAllocatorProcess::updateAllocation(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
const Resources& offeredResources,
const vector<Offer::Operation>& operations)
{
CHECK(initialized);
CHECK(slaves.contains(slaveId));
CHECK(frameworks.contains(frameworkId));
Slave& slave = slaves.at(slaveId);
// We require that an allocation is tied to a single role.
//
// TODO(bmahler): The use of `Resources::allocations()` induces
// unnecessary copying of `Resources` objects (which is expensive
// at the time this was written).
hashmap<string, Resources> allocations = offeredResources.allocations();
CHECK_EQ(1u, allocations.size());
string role = allocations.begin()->first;
CHECK(frameworkSorters.contains(role));
const Owned<Sorter>& frameworkSorter = frameworkSorters.at(role);
const Resources frameworkAllocation =
frameworkSorter->allocation(frameworkId.value(), slaveId);
// We keep a copy of the offered resources here and it is updated
// by the operations.
Resources updatedOfferedResources = offeredResources;
// Accumulate consumed resources for all tasks in all `LAUNCH` operations.
//
// For LAUNCH operations we support tasks requesting more instances of
// shared resources than those being offered. We keep track of total
// consumed resources to determine the additional instances and allocate
// them as part of updating the framework's allocation (i.e., add
// them to the allocated resources in the allocator and in each
// of the sorters).
Resources consumed;
// Used for logging.
hashset<TaskID> taskIds;
foreach (const Offer::Operation& operation, operations) {
// The operations should have been normalized by the master via
// `protobuf::injectAllocationInfo()`.
//
// TODO(bmahler): Check that the operations have the allocation
// info set. The master should enforce this. E.g.
//
// foreach (const Offer::Operation& operation, operations) {
// CHECK_NONE(validateOperationOnAllocatedResources(operation));
// }
// Update the offered resources based on this operation.
Try<Resources> _updatedOfferedResources = updatedOfferedResources.apply(
operation);
CHECK_SOME(_updatedOfferedResources);
updatedOfferedResources = _updatedOfferedResources.get();
if (operation.type() == Offer::Operation::LAUNCH) {
foreach (const TaskInfo& task, operation.launch().task_infos()) {
taskIds.insert(task.task_id());
// For now we only need to look at the task resources and
// ignore the executor resources.
//
// TODO(anindya_sinha): For simplicity we currently don't
// allow shared resources in ExecutorInfo. The reason is that
// the allocator has no idea if the executor within the task
// represents a new executor. Therefore we cannot reliably
// determine if the executor resources are needed for this task.
// The TODO is to support it. We need to pass in the information
// pertaining to the executor before enabling shared resources
// in the executor.
consumed += task.resources();
}
}
}
// Check that offered resources contain at least one copy of each
// consumed shared resource (guaranteed by master validation).
Resources consumedShared = consumed.shared();
Resources updatedOfferedShared = updatedOfferedResources.shared();
foreach (const Resource& resource, consumedShared) {
CHECK(updatedOfferedShared.contains(resource));
}
// Determine the additional instances of shared resources needed to be
// added to the allocations.
Resources additional = consumedShared - updatedOfferedShared;
if (!additional.empty()) {
LOG(INFO) << "Allocating additional resources " << additional
<< " for tasks " << stringify(taskIds)
<< " of framework " << frameworkId << " on agent " << slaveId;
updatedOfferedResources += additional;
}
// Update the per-slave allocation.
slave.unallocate(offeredResources);
slave.allocate(updatedOfferedResources);
// Update the allocation in the framework sorter.
frameworkSorter->update(
frameworkId.value(),
slaveId,
offeredResources,
updatedOfferedResources);
// Update the allocation in the role sorter.
roleSorter->update(
role,
slaveId,
offeredResources,
updatedOfferedResources);
// Update the allocated resources in the quota sorter. We only update
// the allocated resources if this role has quota set.
if (quotas.contains(role)) {
// See comment at `quotaRoleSorter` declaration regarding non-revocable.
quotaRoleSorter->update(
role,
slaveId,
offeredResources.nonRevocable(),
updatedOfferedResources.nonRevocable());
}
// Update the agent total resources so they are consistent with the updated
// allocation. We do not directly use `updatedOfferedResources` here because
// the agent's total resources shouldn't contain:
// 1. The additionally allocated shared resources.
// 2. `AllocationInfo` as set in `updatedOfferedResources`.
// We strip `AllocationInfo` from operations in order to apply them
// successfully, since agent's total is stored as unallocated resources.
vector<Offer::Operation> strippedOperations = operations;
foreach (Offer::Operation& operation, strippedOperations) {
protobuf::stripAllocationInfo(&operation);
}
Try<Resources> updatedTotal = slave.getTotal().apply(strippedOperations);
CHECK_SOME(updatedTotal);
updateSlaveTotal(slaveId, updatedTotal.get());
// Update the total resources in the framework sorter.
frameworkSorter->remove(slaveId, offeredResources);
frameworkSorter->add(slaveId, updatedOfferedResources);
// Check that the unreserved quantities for framework allocations
// have not changed by the above operations.
const Resources updatedFrameworkAllocation =
frameworkSorter->allocation(frameworkId.value(), slaveId);
CHECK_EQ(
frameworkAllocation.toUnreserved().createStrippedScalarQuantity(),
updatedFrameworkAllocation.toUnreserved().createStrippedScalarQuantity());
LOG(INFO) << "Updated allocation of framework " << frameworkId
<< " on agent " << slaveId
<< " from " << frameworkAllocation
<< " to " << updatedFrameworkAllocation;
}
Future<Nothing> HierarchicalAllocatorProcess::updateAvailable(
const SlaveID& slaveId,
const vector<Offer::Operation>& operations)
{
// Note that the operations may contain allocated resources,
// however such operations can be applied to unallocated
// resources unambiguously, so we don't have a strict CHECK
// for the operations to contain only unallocated resources.
CHECK(initialized);
CHECK(slaves.contains(slaveId));
Slave& slave = slaves.at(slaveId);
// It's possible for this 'apply' to fail here because a call to
// 'allocate' could have been enqueued by the allocator itself
// just before master's request to enqueue 'updateAvailable'
// arrives to the allocator.
//
// Master -------R------------
// \----+
// |
// Allocator --A-----A-U---A--
// \___/ \___/
//
// where A = allocate, R = reserve, U = updateAvailable
Try<Resources> updatedAvailable = slave.getAvailable().apply(operations);
if (updatedAvailable.isError()) {
return Failure(updatedAvailable.error());
}
// Update the total resources.
Try<Resources> updatedTotal = slave.getTotal().apply(operations);
CHECK_SOME(updatedTotal);
// Update the total resources in the allocator and role and quota sorters.
updateSlaveTotal(slaveId, updatedTotal.get());
return Nothing();
}
void HierarchicalAllocatorProcess::updateUnavailability(
const SlaveID& slaveId,
const Option<Unavailability>& unavailability)
{
CHECK(initialized);
CHECK(slaves.contains(slaveId));
Slave& slave = slaves.at(slaveId);
// NOTE: We currently implement maintenance in the allocator to be able to
// leverage state and features such as the FrameworkSorter and OfferFilter.
// We explicitly remove all filters for the inverse offers of this slave. We
// do this because we want to force frameworks to reassess the calculations
// they have made to respond to the inverse offer. Unavailability of a slave
// can have a large effect on failure domain calculations and inter-leaved
// unavailability schedules.
foreachvalue (Framework& framework, frameworks) {
framework.inverseOfferFilters.erase(slaveId);
}
// Remove any old unavailability.
slave.maintenance = None();
// If we have a new unavailability.
if (unavailability.isSome()) {
slave.maintenance = Slave::Maintenance(unavailability.get());
}
allocate(slaveId);
}
void HierarchicalAllocatorProcess::updateInverseOffer(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const Option<UnavailableResources>& unavailableResources,
const Option<InverseOfferStatus>& status,
const Option<Filters>& filters)
{
CHECK(initialized);
CHECK(frameworks.contains(frameworkId));
CHECK(slaves.contains(slaveId));
Framework& framework = frameworks.at(frameworkId);
Slave& slave = slaves.at(slaveId);
CHECK(slave.maintenance.isSome());
// NOTE: We currently implement maintenance in the allocator to be able to
// leverage state and features such as the FrameworkSorter and OfferFilter.
// We use a reference by alias because we intend to modify the
// `maintenance` and to improve readability.
Slave::Maintenance& maintenance = slave.maintenance.get();
// Only handle inverse offers that we currently have outstanding. If it is not
// currently outstanding this means it is old and can be safely ignored.
if (maintenance.offersOutstanding.contains(frameworkId)) {
// We always remove the outstanding offer so that we will send a new offer
// out the next time we schedule inverse offers.
maintenance.offersOutstanding.erase(frameworkId);
// If the response is `Some`, this means the framework responded. Otherwise
// if it is `None` the inverse offer timed out or was rescinded.
if (status.isSome()) {
// For now we don't allow frameworks to respond with `UNKNOWN`. The caller
// should guard against this. This goes against the pattern of not
// checking external invariants; however, the allocator and master are
// currently so tightly coupled that this check is valuable.
CHECK_NE(status.get().status(), InverseOfferStatus::UNKNOWN);
// If the framework responded, we update our state to match.
maintenance.statuses[frameworkId].CopyFrom(status.get());
}
}
// No need to install filters if `filters` is none.
if (filters.isNone()) {
return;
}
// Create a refused resource filter.
Try<Duration> seconds = Duration::create(filters.get().refuse_seconds());
if (seconds.isError()) {
LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
<< "the refused inverse offer filter because the input value "
<< "is invalid: " << seconds.error();
seconds = Duration::create(Filters().refuse_seconds());
} else if (seconds.get() < Duration::zero()) {
LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
<< "the refused inverse offer filter because the input value "
<< "is negative";
seconds = Duration::create(Filters().refuse_seconds());
}
CHECK_SOME(seconds);
if (seconds.get() != Duration::zero()) {
VLOG(1) << "Framework " << frameworkId
<< " filtered inverse offers from agent " << slaveId
<< " for " << seconds.get();
// Create a new inverse offer filter and delay its expiration.
InverseOfferFilter* inverseOfferFilter =
new RefusedInverseOfferFilter(Timeout::in(seconds.get()));
framework.inverseOfferFilters[slaveId].insert(inverseOfferFilter);
// We need to disambiguate the function call to pick the correct
// `expire()` overload.
void (Self::*expireInverseOffer)(
const FrameworkID&,
const SlaveID&,
InverseOfferFilter*) = &Self::expire;
delay(
seconds.get(),
self(),
expireInverseOffer,
frameworkId,
slaveId,
inverseOfferFilter);
}
}
Future<hashmap<SlaveID, hashmap<FrameworkID, InverseOfferStatus>>>
HierarchicalAllocatorProcess::getInverseOfferStatuses()
{
CHECK(initialized);
hashmap<SlaveID, hashmap<FrameworkID, InverseOfferStatus>> result;
// Make a copy of the most recent statuses.
foreachpair (const SlaveID& id, const Slave& slave, slaves) {
if (slave.maintenance.isSome()) {
result[id] = slave.maintenance.get().statuses;
}
}
return result;
}
void HierarchicalAllocatorProcess::recoverResources(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
const Resources& resources,
const Option<Filters>& filters)
{
CHECK(initialized);
if (resources.empty()) {
return;
}
// For now, we require that resources are recovered within a single
// allocation role (since filtering in the same manner across roles
// seems undesirable).
//
// TODO(bmahler): The use of `Resources::allocations()` induces
// unnecessary copying of `Resources` objects (which is expensive
// at the time this was written).
hashmap<string, Resources> allocations = resources.allocations();
CHECK_EQ(1u, allocations.size());
string role = allocations.begin()->first;
// Updated resources allocated to framework (if framework still
// exists, which it might not in the event that we dispatched
// Master::offer before we received
// MesosAllocatorProcess::removeFramework or
// MesosAllocatorProcess::deactivateFramework, in which case we will
// have already recovered all of its resources).
if (frameworks.contains(frameworkId)) {
CHECK(frameworkSorters.contains(role));
const Owned<Sorter>& frameworkSorter = frameworkSorters.at(role);
if (frameworkSorter->contains(frameworkId.value())) {
untrackAllocatedResources(slaveId, frameworkId, resources);
// Stop tracking the framework under this role if it's no longer
// subscribed and no longer has resources allocated to the role.
if (frameworks.at(frameworkId).roles.count(role) == 0 &&
frameworkSorter->allocation(frameworkId.value()).empty()) {
untrackFrameworkUnderRole(frameworkId, role);
}
}
}
// Update resources allocated on slave (if slave still exists,
// which it might not in the event that we dispatched Master::offer
// before we received Allocator::removeSlave).
if (slaves.contains(slaveId)) {
Slave& slave = slaves.at(slaveId);
CHECK(slave.getAllocated().contains(resources))
<< slave.getAllocated() << " does not contain " << resources;
slave.unallocate(resources);
VLOG(1) << "Recovered " << resources
<< " (total: " << slave.getTotal()
<< ", allocated: " << slave.getAllocated() << ")"
<< " on agent " << slaveId
<< " from framework " << frameworkId;
}
// No need to install the filter if 'filters' is none.
if (filters.isNone()) {
return;
}
// No need to install the filter if slave/framework does not exist.
if (!frameworks.contains(frameworkId) || !slaves.contains(slaveId)) {
return;
}
// Create a refused resources filter.
Try<Duration> timeout = Duration::create(filters.get().refuse_seconds());
if (timeout.isError()) {
LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
<< "the refused resources filter because the input value "
<< "is invalid: " << timeout.error();
timeout = Duration::create(Filters().refuse_seconds());
} else if (timeout.get() < Duration::zero()) {
LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
<< "the refused resources filter because the input value "
<< "is negative";
timeout = Duration::create(Filters().refuse_seconds());
}
CHECK_SOME(timeout);
if (timeout.get() != Duration::zero()) {
VLOG(1) << "Framework " << frameworkId
<< " filtered agent " << slaveId
<< " for " << timeout.get();
// Create a new filter. Note that we unallocate the resources
// since filters are applied per-role already.
Resources unallocated = resources;
unallocated.unallocate();
OfferFilter* offerFilter = new RefusedOfferFilter(unallocated);
frameworks.at(frameworkId)
.offerFilters[role][slaveId].insert(offerFilter);
// Expire the filter after both an `allocationInterval` and the
// `timeout` have elapsed. This ensures that the filter does not
// expire before we perform the next allocation for this agent,
// see MESOS-4302 for more information.
//
// Because the next periodic allocation goes through a dispatch
// after `allocationInterval`, we do the same for `expire()`
// (with a helper `_expire()`) to achieve the above.
//
// TODO(alexr): If we allocated upon resource recovery
// (MESOS-3078), we would not need to increase the timeout here.
timeout = std::max(allocationInterval, timeout.get());
// We need to disambiguate the function call to pick the correct
// `expire()` overload.
void (Self::*expireOffer)(
const FrameworkID&,
const string&,
const SlaveID&,
OfferFilter*) = &Self::expire;
delay(timeout.get(),
self(),
expireOffer,
frameworkId,
role,
slaveId,
offerFilter);
}
}
void HierarchicalAllocatorProcess::suppressOffers(
const FrameworkID& frameworkId,
const set<string>& roles_)
{
CHECK(initialized);
CHECK(frameworks.contains(frameworkId));
Framework& framework = frameworks.at(frameworkId);
// Deactivating the framework in the sorter is fine as long as
// SUPPRESS is not parameterized. When parameterization is added,
// we have to differentiate between the cases here.
const set<string>& roles = roles_.empty() ? framework.roles : roles_;
foreach (const string& role, roles) {
CHECK(frameworkSorters.contains(role));
frameworkSorters.at(role)->deactivate(frameworkId.value());
framework.suppressedRoles.insert(role);
}
LOG(INFO) << "Suppressed offers for roles " << stringify(roles)
<< " of framework " << frameworkId;
}
void HierarchicalAllocatorProcess::reviveOffers(
const FrameworkID& frameworkId,
const set<string>& roles_)
{
CHECK(initialized);
CHECK(frameworks.contains(frameworkId));
Framework& framework = frameworks.at(frameworkId);
framework.offerFilters.clear();
framework.inverseOfferFilters.clear();
const set<string>& roles = roles_.empty() ? framework.roles : roles_;
// Activating the framework in the sorter on REVIVE is fine as long as
// SUPPRESS is not parameterized. When parameterization is added,
// we may need to differentiate between the cases here.
foreach (const string& role, roles) {
CHECK(frameworkSorters.contains(role));
frameworkSorters.at(role)->activate(frameworkId.value());
framework.suppressedRoles.erase(role);
}
// We delete each actual `OfferFilter` when
// `HierarchicalAllocatorProcess::expire` gets invoked. If we delete the
// `OfferFilter` here it's possible that the same `OfferFilter` (i.e., same
// address) could get reused and `HierarchicalAllocatorProcess::expire`
// would expire that filter too soon. Note that this only works
// right now because ALL Filter types "expire".
LOG(INFO) << "Revived offers for roles " << stringify(roles)
<< " of framework " << frameworkId;
allocate();
}
void HierarchicalAllocatorProcess::setQuota(
const string& role,
const Quota& quota)
{
CHECK(initialized);
// This method should be called by the master only if the quota for
// the role is not set. Setting quota differs from updating it because
// the former moves the role to a different allocation group with a
// dedicated sorter, while the later just updates the actual quota.
CHECK(!quotas.contains(role));
// Persist quota in memory and add the role into the corresponding
// allocation group.
quotas[role] = quota;
quotaRoleSorter->add(role);
quotaRoleSorter->activate(role);
// Copy allocation information for the quota'ed role.
if (roleSorter->contains(role)) {
hashmap<SlaveID, Resources> roleAllocation = roleSorter->allocation(role);
foreachpair (
const SlaveID& slaveId, const Resources& resources, roleAllocation) {
// See comment at `quotaRoleSorter` declaration regarding non-revocable.
quotaRoleSorter->allocated(role, slaveId, resources.nonRevocable());
}
}
metrics.setQuota(role, quota);
// TODO(alexr): Print all quota info for the role.
LOG(INFO) << "Set quota " << quota.info.guarantee() << " for role '" << role
<< "'";
// NOTE: Since quota changes do not result in rebalancing of
// offered resources, we do not trigger an allocation here; the
// quota change will be reflected in subsequent allocations.
//
// If we add the ability for quota changes to incur a rebalancing
// of offered resources, then we should trigger that here.
}
void HierarchicalAllocatorProcess::removeQuota(
const string& role)
{
CHECK(initialized);
// Do not allow removing quota if it is not set.
CHECK(quotas.contains(role));
CHECK(quotaRoleSorter->contains(role));
// TODO(alexr): Print all quota info for the role.
LOG(INFO) << "Removed quota " << quotas[role].info.guarantee()
<< " for role '" << role << "'";
// Remove the role from the quota'ed allocation group.
quotas.erase(role);
quotaRoleSorter->remove(role);
metrics.removeQuota(role);
// NOTE: Since quota changes do not result in rebalancing of
// offered resources, we do not trigger an allocation here; the
// quota change will be reflected in subsequent allocations.
//
// If we add the ability for quota changes to incur a rebalancing
// of offered resources, then we should trigger that here.
}
void HierarchicalAllocatorProcess::updateWeights(
const vector<WeightInfo>& weightInfos)
{
CHECK(initialized);
foreach (const WeightInfo& weightInfo, weightInfos) {
CHECK(weightInfo.has_role());
quotaRoleSorter->updateWeight(weightInfo.role(), weightInfo.weight());
roleSorter->updateWeight(weightInfo.role(), weightInfo.weight());
}
// NOTE: Since weight changes do not result in rebalancing of
// offered resources, we do not trigger an allocation here; the
// weight change will be reflected in subsequent allocations.
//
// If we add the ability for weight changes to incur a rebalancing
// of offered resources, then we should trigger that here.
}
void HierarchicalAllocatorProcess::pause()
{
if (!paused) {
VLOG(1) << "Allocation paused";
paused = true;
}
}
void HierarchicalAllocatorProcess::resume()
{
if (paused) {
VLOG(1) << "Allocation resumed";
paused = false;
}
}
Future<Nothing> HierarchicalAllocatorProcess::allocate()
{
return allocate(slaves.keys());
}
Future<Nothing> HierarchicalAllocatorProcess::allocate(
const SlaveID& slaveId)
{
hashset<SlaveID> slaves({slaveId});
return allocate(slaves);
}
Future<Nothing> HierarchicalAllocatorProcess::allocate(
const hashset<SlaveID>& slaveIds)
{
if (paused) {
VLOG(1) << "Skipped allocation because the allocator is paused";
return Nothing();
}
allocationCandidates |= slaveIds;
if (allocation.isNone() || !allocation->isPending()) {
metrics.allocation_run_latency.start();
allocation = dispatch(self(), &Self::_allocate);
}
return allocation.get();
}
Nothing HierarchicalAllocatorProcess::_allocate()
{
metrics.allocation_run_latency.stop();
if (paused) {
VLOG(1) << "Skipped allocation because the allocator is paused";
return Nothing();
}
++metrics.allocation_runs;
Stopwatch stopwatch;
stopwatch.start();
metrics.allocation_run.start();
__allocate();
// NOTE: For now, we implement maintenance inverse offers within the
// allocator. We leverage the existing timer/cycle of offers to also do any
// "deallocation" (inverse offers) necessary to satisfy maintenance needs.
deallocate();
metrics.allocation_run.stop();
VLOG(1) << "Performed allocation for " << allocationCandidates.size()
<< " agents in " << stopwatch.elapsed();
// Clear the candidates on completion of the allocation run.
allocationCandidates.clear();
return Nothing();
}
// TODO(alexr): Consider factoring out the quota allocation logic.
void HierarchicalAllocatorProcess::__allocate()
{
// Compute the offerable resources, per framework:
// (1) For reserved resources on the slave, allocate these to a
// framework having the corresponding role.
// (2) For unreserved resources on the slave, allocate these
// to a framework of any role.
hashmap<FrameworkID, hashmap<string, hashmap<SlaveID, Resources>>> offerable;
// NOTE: This function can operate on a small subset of
// `allocationCandidates`, we have to make sure that we don't
// assume cluster knowledge when summing resources from that set.
vector<SlaveID> slaveIds;
slaveIds.reserve(allocationCandidates.size());
// Filter out non-whitelisted, removed, and deactivated slaves
// in order not to send offers for them.
foreach (const SlaveID& slaveId, allocationCandidates) {
if (isWhitelisted(slaveId) &&
slaves.contains(slaveId) &&
slaves.at(slaveId).activated) {
slaveIds.push_back(slaveId);
}
}
// Randomize the order in which slaves' resources are allocated.
//
// TODO(vinod): Implement a smarter sorting algorithm.
std::random_shuffle(slaveIds.begin(), slaveIds.end());
// Returns the __quantity__ of resources allocated to a quota role. Since we
// account for reservations and persistent volumes toward quota, we strip
// reservation and persistent volume related information for comparability.
// The result is used to determine whether a role's quota is satisfied, and
// also to determine how many resources the role would need in order to meet
// its quota.
//
// NOTE: Revocable resources are excluded in `quotaRoleSorter`.
auto getQuotaRoleAllocatedResources = [this](const string& role) {
CHECK(quotas.contains(role));
return quotaRoleSorter->allocationScalarQuantities(role);
};
// We need to keep track of allocated reserved resourecs for roles
// with quota in order to enforce their quota limit. Note these are
// __quantities__ with no meta-data.
hashmap<string, Resources> allocatedReservationScalarQuantities;
// We build the map here to avoid repetitive aggregation
// in the allocation loop. Note, this map will still need to be
// updated during the allocation loop when new allocations
// are made.
//
// TODO(mzhu): Ideally, we want to buildup and persist this information
// across allocation cycles in track/untrackAllocatedResources().
// But due to the presence of shared resources, we need to keep track of
// the allocated resources (and not just scalar quantities) on a slave
// to account for multiple copies of the same shared resources.
// While the `allocated` info in the `struct slave` gives us just that,
// we can not simply use that in track/untrackAllocatedResources() since
// `allocated` is currently updated outside the scope of
// track/untrackAllocatedResources(), meaning that it may get updated
// either before or after the tracking calls.
//
// TODO(mzhu): Ideally, we want these helpers to instead track the
// reservations as *allocated* in the sorters even when the
// reservations have not been allocated yet. This will help to:
//
// (1) Solve the fairness issue when roles with unallocated
// reservations may game the allocator (See MESOS-8299).
//
// (2) Simplify the quota enforcement logic -- the allocator
// would no longer need to track reservations separately.
foreachkey (const string& role, quotas) {
const hashmap<SlaveID, Resources> allocations =
quotaRoleSorter->allocation(role);
foreachvalue (const Resources& resources, allocations) {
allocatedReservationScalarQuantities[role] +=
resources.reserved().createStrippedScalarQuantity();
}
}
// We need to constantly make sure that we are holding back enough unreserved
// resources that the remaining quota can later be satisfied when needed:
//
// Required unreserved headroom =
// sum (unsatisfied quota(r) - unallocated reservations(r))
// for each quota role r
//
// Given the above, if a role has more reservations than quota,
// we don't need to hold back any unreserved headroom for it.
Resources requiredHeadroom;
foreachpair (const string& role, const Quota& quota, quotas) {
// NOTE: Revocable resources are excluded in `quotaRoleSorter`.
// NOTE: Only scalars are considered for quota.
// NOTE: The following should all be quantities with no meta-data!
Resources allocated = getQuotaRoleAllocatedResources(role);
const Resources guarantee = quota.info.guarantee();
if (allocated.contains(guarantee)) {
continue; // Quota already satisifed.
}
Resources unallocated = guarantee - allocated;
Resources unallocatedReservations =
reservationScalarQuantities.get(role).getOrElse(Resources()) -
allocatedReservationScalarQuantities.get(role).getOrElse(Resources());
requiredHeadroom += unallocated - unallocatedReservations;
}
// We will allocate resources while ensuring that the required
// unreserved non-revocable headroom is still available. Otherwise,
// we will not be able to satisfy quota later.
//
// available headroom = unallocated unreserved non-revocable resources
//
// We compute this as:
//
// available headroom = total resources -
// allocated resources -
// unallocated reservations -
// unallocated revocable resources
Resources availableHeadroom = roleSorter->totalScalarQuantities();
// Subtract allocated resources from the total.
foreachkey (const string& role, roles) {
availableHeadroom -= roleSorter->allocationScalarQuantities(role);
}
// Calculate total allocated reservations. Note that we need to ensure
// we count a reservation for "a" being allocated to "a/b", therefore
// we cannot simply loop over the reservations' roles.
Resources totalAllocatedReservationScalarQuantities;
foreachkey (const string& role, roles) {
hashmap<SlaveID, Resources> allocations;
if (quotaRoleSorter->contains(role)) {
allocations = quotaRoleSorter->allocation(role);
} else if (roleSorter->contains(role)) {
allocations = roleSorter->allocation(role);
} else {
continue; // This role has no allocation.
}
foreachvalue (const Resources& resources, allocations) {
totalAllocatedReservationScalarQuantities +=
resources.reserved().createStrippedScalarQuantity();
}
}
// Subtract total unallocated reservations.
availableHeadroom -=
Resources::sum(reservationScalarQuantities) -
totalAllocatedReservationScalarQuantities;
// Subtract revocable resources.
foreachvalue (const Slave& slave, slaves) {
availableHeadroom -=
slave.getAvailable().revocable().createStrippedScalarQuantity();
}
// Due to the two stages in the allocation algorithm and the nature of
// shared resources being re-offerable even if already allocated, the
// same shared resources can appear in two (and not more due to the
// `allocatable` check in each stage) distinct offers in one allocation
// cycle. This is undesirable since the allocator API contract should
// not depend on its implementation details. For now we make sure a
// shared resource is only allocated once in one offer cycle. We use
// `offeredSharedResources` to keep track of shared resources already
// allocated in the current cycle.
hashmap<SlaveID, Resources> offeredSharedResources;
// Quota comes first and fair share second. Here we process only those
// roles for which quota is set (quota'ed roles). Such roles form a
// special allocation group with a dedicated sorter.
foreach (const SlaveID& slaveId, slaveIds) {
foreach (const string& role, quotaRoleSorter->sort()) {
CHECK(quotas.contains(role));
const Quota& quota = quotas.at(role);
// If there are no active frameworks in this role, we do not
// need to do any allocations for this role.
if (!roles.contains(role)) {
continue;
}
// This is a __quantity__ with no meta-data.
Resources roleReservationScalarQuantities =
reservationScalarQuantities.get(role).getOrElse(Resources());
// This is a __quantity__ with no meta-data.
Resources roleAllocatedReservationScalarQuantities =
allocatedReservationScalarQuantities.get(role).getOrElse(Resources());
// We charge a role against its quota by considering its
// allocation as well as any unallocated reservations
// since reservations are bound to the role. In other
// words, we always consider reservations as consuming
// quota, regardless of whether they are allocated.
// The equation used here is:
//
// Consumed Quota = reservations + unreserved allocation
// = reservations + (allocation - allocated reservations)
//
// This is a __quantity__ with no meta-data.
Resources resourcesChargedAgainstQuota =
roleReservationScalarQuantities +
(getQuotaRoleAllocatedResources(role) -
roleAllocatedReservationScalarQuantities);
// If quota for the role is considered satisfied, then we only
// further allocate reservations for the role.
//
// TODO(alexr): Skipping satisfied roles is pessimistic. Better
// alternatives are:
// * A custom sorter that is aware of quotas and sorts accordingly.
// * Removing satisfied roles from the sorter.
//
// This is a scalar quantity with no meta-data.
Resources unsatisfiedQuota = Resources(quota.info.guarantee()) -
resourcesChargedAgainstQuota;
// Fetch frameworks according to their fair share.
// NOTE: Suppressed frameworks are not included in the sort.
CHECK(frameworkSorters.contains(role));
const Owned<Sorter>& frameworkSorter = frameworkSorters.at(role);
foreach (const string& frameworkId_, frameworkSorter->sort()) {
FrameworkID frameworkId;
frameworkId.set_value(frameworkId_);
CHECK(slaves.contains(slaveId));
CHECK(frameworks.contains(frameworkId));
const Framework& framework = frameworks.at(frameworkId);
Slave& slave = slaves.at(slaveId);
// Only offer resources from slaves that have GPUs to
// frameworks that are capable of receiving GPUs.
// See MESOS-5634.
if (filterGpuResources &&
!framework.capabilities.gpuResources &&
slave.getTotal().gpus().getOrElse(0) > 0) {
continue;
}
// If this framework is not region-aware, don't offer it
// resources on agents in remote regions.
if (!framework.capabilities.regionAware && isRemoteSlave(slave)) {
continue;
}
// Calculate the currently available resources on the slave, which
// is the difference in non-shared resources between total and
// allocated, plus all shared resources on the agent (if applicable).
// Since shared resources are offerable even when they are in use, we
// make one copy of the shared resources available regardless of the
// past allocations.
Resources available = slave.getAvailable().nonShared();
// Offer a shared resource only if it has not been offered in
// this offer cycle to a framework.
if (framework.capabilities.sharedResources) {
available += slave.getTotal().shared();
if (offeredSharedResources.contains(slaveId)) {
available -= offeredSharedResources[slaveId];
}
}
// We allocate the role's reservations as well as any unreserved
// resources while ensuring the role stays within its quota limits.
// This means that we'll "chop" the unreserved resources up to
// the quota limit if necessary.
//
// E.g. A role has no allocations or reservations yet and a 10 cpu
// quota limit. We'll chop a 15 cpu agent down to only
// allocate 10 cpus to the role to keep it within its limit.
//
// In the case that the role needs some of the resources on this
// agent to make progress towards its quota, or the role is being
// allocated some reservation(s), we'll *also* allocate all of
// the resources for which it does not have quota.
//
// E.g. The agent has 1 cpu, 1024 mem, 1024 disk, 1 gpu, 5 ports
// and the role has quota for 1 cpu, 1024 mem. We'll include
// the disk, gpu, and ports in the allocation, despite the
// role not having any quota guarantee for them.
//
// We have to do this for now because it's not possible to set
// quota on non-scalar resources, like ports. For scalar resources
// that this role has no quota for, it can be allocated as long
// as the quota headroom is not violated.
//
// TODO(mzhu): Since we're treating the resources with unset
// quota as having no guarantee and no limit, these should be
// also be allocated further in the second allocation "phase"
// below (above guarantee up to limit).
// NOTE: Currently, frameworks are allowed to have '*' role.
// Calling reserved('*') returns an empty Resources object.
//
// NOTE: Since we currently only support top-level roles to
// have quota, there are no ancestor reservations involved here.
Resources resources = available.reserved(role).nonRevocable();
// Unreserved resources that are tentatively going to be
// allocated towards this role's quota. These resources may
// not get allocated due to framework filters.
// These are __quantities__ with no meta-data.
Resources newQuotaAllocationScalarQuantities;
// We put resource that this role has no quota for in
// `nonQuotaResources` tentatively.
Resources nonQuotaResources;
Resources unreserved = available.nonRevocable().unreserved();
set<string> quotaResourceNames =
Resources(quota.info.guarantee()).names();
// When "chopping" resources, there is more than 1 "chop" that
// can be done to satisfy the limits. Consider the case with
// two disks of 1GB, one is PATH and another is MOUNT. And a
// role has a "disk" quota of 1GB. We could pick either of
// the disks here, but not both.
//
// In order to avoid repeatedly choosing the same "chop" of
// the resources each time we allocate, we introduce some
// randomness by shuffling the resources.
google::protobuf::RepeatedPtrField<Resource>
resourceVector = unreserved;
random_shuffle(resourceVector.begin(), resourceVector.end());
foreach (Resource& resource, resourceVector) {
if (resource.type() != Value::SCALAR) {
// We currently do not support quota for non-scalar resources,
// add it to `nonQuotaResources`. See `nonQuotaResources`
// regarding how these resources are allocated.
nonQuotaResources += resource;
continue;
}
if (quotaResourceNames.count(resource.name()) == 0) {
// Allocating resource that this role has NO quota for,
// the limit concern here is that it should not break the
// quota headroom.
//
// Allocation Limit = Available Headroom - Required Headroom -
// Tentative Allocation to Role
Resources upperLimitScalarQuantities =
availableHeadroom - requiredHeadroom -
(newQuotaAllocationScalarQuantities +
nonQuotaResources.createStrippedScalarQuantity());
Option<Value::Scalar> limitScalar =
upperLimitScalarQuantities.get<Value::Scalar>(resource.name());
if (limitScalar.isNone()) {
continue; // Already have a headroom deficit.
}
if (Resources::shrink(&resource, limitScalar.get())) {
nonQuotaResources += resource;
}
} else {
// Allocating resource that this role has quota for,
// the limit concern is that it should not exceed this
// role's unsatisfied quota.
Resources upperLimitScalarQuantities =
unsatisfiedQuota - newQuotaAllocationScalarQuantities;
Option<Value::Scalar> limitScalar =
upperLimitScalarQuantities.get<Value::Scalar>(resource.name());
if (limitScalar.isNone()) {
continue; // Quota limit already met.
}
if (Resources::shrink(&resource, limitScalar.get())) {
resources += resource;
newQuotaAllocationScalarQuantities +=
Resources(resource).createStrippedScalarQuantity();
}
}
}
// We include the non-quota resources (with headroom taken
// into account) if this role is being allocated some resources
// already: either some quota resources or a reservation
// (possibly with quota resources).
if (!resources.empty()) {
resources += nonQuotaResources;
}
// It is safe to break here, because all frameworks under a role would
// consider the same resources, so in case we don't have allocatable
// resources, we don't have to check for other frameworks under the
// same role. We only break out of the innermost loop, so the next step
// will use the same `slaveId`, but a different role.
//
// NOTE: The resources may not be allocatable here, but they can be
// accepted by one of the frameworks during the second allocation
// stage.
if (!allocatable(resources)) {
break;
}
// When reservation refinements are present, old frameworks without the
// RESERVATION_REFINEMENT capability won't be able to understand the
// new format. While it's possible to translate the refined reservations
// into the old format by "hiding" the intermediate reservations in the
// "stack", this leads to ambiguity when processing RESERVE / UNRESERVE
// operations. This is due to the loss of information when we drop the
// intermediatereservations. Therefore, for now we simply filter out
// resources with refined reservations if the framework does not have
// the capability.
if (!framework.capabilities.reservationRefinement) {
resources = resources.filter([](const Resource& resource) {
return !Resources::hasRefinedReservations(resource);
});
}
// If the framework filters these resources, ignore. The unallocated
// part of the quota will not be allocated to other roles.
if (isFiltered(frameworkId, role, slaveId, resources)) {
continue;
}
VLOG(2) << "Allocating " << resources << " on agent " << slaveId
<< " to role " << role << " of framework " << frameworkId
<< " as part of its role quota";
resources.allocate(role);
// NOTE: We perform "coarse-grained" allocation for quota'ed
// resources, which may lead to overcommitment of resources beyond
// quota. This is fine since quota currently represents a guarantee.
offerable[frameworkId][role][slaveId] += resources;
offeredSharedResources[slaveId] += resources.shared();
unsatisfiedQuota -= newQuotaAllocationScalarQuantities;
// Track quota headroom change.
requiredHeadroom -= newQuotaAllocationScalarQuantities;
availableHeadroom -=
resources.unreserved().createStrippedScalarQuantity();
// Update the tracking of allocated reservations.
//
// Note it is important to do this before updating `slave.allocated`
// because we rely on `slave.allocated` to check against accounting
// multiple copies of the same shared resources.
const Resources newShared = resources.shared()
.filter([this, &slaveId](const Resources& resource) {
return !slaves.at(slaveId).getAllocated().contains(resource);
});
// We remove the static reservation metadata here via `toUnreserved()`.
allocatedReservationScalarQuantities[role] +=
(resources.reserved(role).nonShared() + newShared)
.createStrippedScalarQuantity().toUnreserved();
slave.allocate(resources);
trackAllocatedResources(slaveId, frameworkId, resources);
}
}
}
// Similar to the first stage, we will allocate resources while ensuring
// that the required unreserved non-revocable headroom is still available.
// Otherwise, we will not be able to satisfy quota later. Reservations to
// non-quota roles and revocable resources will always be included
// in the offers since these are not part of the headroom (and
// therefore can't be used to satisfy quota).
foreach (const SlaveID& slaveId, slaveIds) {
foreach (const string& role, roleSorter->sort()) {
// In the second allocation stage, we only allocate
// for non-quota roles.
if (quotas.contains(role)) {
continue;
}
// NOTE: Suppressed frameworks are not included in the sort.
CHECK(frameworkSorters.contains(role));
const Owned<Sorter>& frameworkSorter = frameworkSorters.at(role);
foreach (const string& frameworkId_, frameworkSorter->sort()) {
FrameworkID frameworkId;
frameworkId.set_value(frameworkId_);
CHECK(slaves.contains(slaveId));
CHECK(frameworks.contains(frameworkId));
const Framework& framework = frameworks.at(frameworkId);
Slave& slave = slaves.at(slaveId);
// Only offer resources from slaves that have GPUs to
// frameworks that are capable of receiving GPUs.
// See MESOS-5634.
if (filterGpuResources &&
!framework.capabilities.gpuResources &&
slave.getTotal().gpus().getOrElse(0) > 0) {
continue;
}
// If this framework is not region-aware, don't offer it
// resources on agents in remote regions.
if (!framework.capabilities.regionAware && isRemoteSlave(slave)) {
continue;
}
// Calculate the currently available resources on the slave, which
// is the difference in non-shared resources between total and
// allocated, plus all shared resources on the agent (if applicable).
// Since shared resources are offerable even when they are in use, we
// make one copy of the shared resources available regardless of the
// past allocations.
Resources available = slave.getAvailable().nonShared();
// Offer a shared resource only if it has not been offered in
// this offer cycle to a framework.
if (framework.capabilities.sharedResources) {
available += slave.getTotal().shared();
if (offeredSharedResources.contains(slaveId)) {
available -= offeredSharedResources[slaveId];
}
}
// The resources we offer are the unreserved resources as well as the
// reserved resources for this particular role and all its ancestors
// in the role hierarchy.
//
// NOTE: Currently, frameworks are allowed to have '*' role.
// Calling reserved('*') returns an empty Resources object.
//
// TODO(mpark): Offer unreserved resources as revocable beyond quota.
Resources resources = available.allocatableTo(role);
// It is safe to break here, because all frameworks under a role would
// consider the same resources, so in case we don't have allocatable
// resources, we don't have to check for other frameworks under the
// same role. We only break out of the innermost loop, so the next step
// will use the same slaveId, but a different role.
//
// The difference to the second `allocatable` check is that here we also
// check for revocable resources, which can be disabled on a per frame-
// work basis, which requires us to go through all frameworks in case we
// have allocatable revocable resources.
if (!allocatable(resources)) {
break;
}
// Remove revocable resources if the framework has not opted for them.
if (!framework.capabilities.revocableResources) {
resources = resources.nonRevocable();
}
// When reservation refinements are present, old frameworks without the
// RESERVATION_REFINEMENT capability won't be able to understand the
// new format. While it's possible to translate the refined reservations
// into the old format by "hiding" the intermediate reservations in the
// "stack", this leads to ambiguity when processing RESERVE / UNRESERVE
// operations. This is due to the loss of information when we drop the
// intermediatereservations. Therefore, for now we simply filter out
// resources with refined reservations if the framework does not have
// the capability.
if (!framework.capabilities.reservationRefinement) {
resources = resources.filter([](const Resource& resource) {
return !Resources::hasRefinedReservations(resource);
});
}
// If allocating these resources would reduce the headroom
// below what is required, we will hold them back.
const Resources headroomToAllocate = resources
.scalars().unreserved().nonRevocable();
bool sufficientHeadroom =
(availableHeadroom -
headroomToAllocate.createStrippedScalarQuantity())
.contains(requiredHeadroom);
if (!sufficientHeadroom) {
resources -= headroomToAllocate;
}
// If the resources are not allocatable, ignore. We cannot break
// here, because another framework under the same role could accept
// revocable resources and breaking would skip all other frameworks.
if (!allocatable(resources)) {
continue;
}
// If the framework filters these resources, ignore.
if (isFiltered(frameworkId, role, slaveId, resources)) {
continue;
}
VLOG(2) << "Allocating " << resources << " on agent " << slaveId
<< " to role " << role << " of framework " << frameworkId;
resources.allocate(role);
// NOTE: We perform "coarse-grained" allocation, meaning that we always
// allocate the entire remaining slave resources to a single framework.
//
// NOTE: We may have already allocated some resources on the current
// agent as part of quota.
offerable[frameworkId][role][slaveId] += resources;
offeredSharedResources[slaveId] += resources.shared();
if (sufficientHeadroom) {
availableHeadroom -=
headroomToAllocate.createStrippedScalarQuantity();
}
slave.allocate(resources);
trackAllocatedResources(slaveId, frameworkId, resources);
}
}
}
if (offerable.empty()) {
VLOG(1) << "No allocations performed";
} else {
// Now offer the resources to each framework.
foreachkey (const FrameworkID& frameworkId, offerable) {
offerCallback(frameworkId, offerable.at(frameworkId));
}
}
}
void HierarchicalAllocatorProcess::deallocate()
{
// If no frameworks are currently registered, no work to do.
if (roles.empty()) {
return;
}
CHECK(!frameworkSorters.empty());
// In this case, `offerable` is actually the slaves and/or resources that we
// want the master to create `InverseOffer`s from.
hashmap<FrameworkID, hashmap<SlaveID, UnavailableResources>> offerable;
// For maintenance, we use the framework sorters to determine which frameworks
// have (1) reserved and / or (2) unreserved resource on the specified
// slaveIds. This way we only send inverse offers to frameworks that have the
// potential to lose something. We keep track of which frameworks already have
// an outstanding inverse offer for the given slave in the
// UnavailabilityStatus of the specific slave using the `offerOutstanding`
// flag. This is equivalent to the accounting we do for resources when we send
// regular offers. If we didn't keep track of outstanding offers then we would
// keep generating new inverse offers even though the framework had not
// responded yet.
foreachvalue (const Owned<Sorter>& frameworkSorter, frameworkSorters) {
foreach (const SlaveID& slaveId, allocationCandidates) {
CHECK(slaves.contains(slaveId));
Slave& slave = slaves.at(slaveId);
if (slave.maintenance.isSome()) {
// We use a reference by alias because we intend to modify the
// `maintenance` and to improve readability.
Slave::Maintenance& maintenance = slave.maintenance.get();
hashmap<string, Resources> allocation =
frameworkSorter->allocation(slaveId);
foreachkey (const string& frameworkId_, allocation) {
FrameworkID frameworkId;
frameworkId.set_value(frameworkId_);
// If this framework doesn't already have inverse offers for the
// specified slave.
if (!offerable[frameworkId].contains(slaveId)) {
// If there isn't already an outstanding inverse offer to this
// framework for the specified slave.
if (!maintenance.offersOutstanding.contains(frameworkId)) {
// Ignore in case the framework filters inverse offers for this
// slave.
//
// NOTE: Since this specific allocator implementation only sends
// inverse offers for maintenance primitives, and those are at the
// whole slave level, we only need to filter based on the
// time-out.
if (isFiltered(frameworkId, slaveId)) {
continue;
}
const UnavailableResources unavailableResources =
UnavailableResources{
Resources(),
maintenance.unavailability};
// For now we send inverse offers with empty resources when the
// inverse offer represents maintenance on the machine. In the
// future we could be more specific about the resources on the
// host, as we have the information available.
offerable[frameworkId][slaveId] = unavailableResources;
// Mark this framework as having an offer outstanding for the
// specified slave.
maintenance.offersOutstanding.insert(frameworkId);
}
}
}
}
}
}
if (offerable.empty()) {
VLOG(1) << "No inverse offers to send out!";
} else {
// Now send inverse offers to each framework.
foreachkey (const FrameworkID& frameworkId, offerable) {
inverseOfferCallback(frameworkId, offerable[frameworkId]);
}
}
}
void HierarchicalAllocatorProcess::_expire(
const FrameworkID& frameworkId,
const string& role,
const SlaveID& slaveId,
OfferFilter* offerFilter)
{
// The filter might have already been removed (e.g., if the
// framework no longer exists or in `reviveOffers()`) but not
// yet deleted (to keep the address from getting reused
// possibly causing premature expiration).
//
// Since this is a performance-sensitive piece of code,
// we use find to avoid the doing any redundant lookups.
auto frameworkIterator = frameworks.find(frameworkId);
if (frameworkIterator != frameworks.end()) {
Framework& framework = frameworkIterator->second;
auto roleFilters = framework.offerFilters.find(role);
if (roleFilters != framework.offerFilters.end()) {
auto agentFilters = roleFilters->second.find(slaveId);
if (agentFilters != roleFilters->second.end()) {
// Erase the filter (may be a no-op per the comment above).
agentFilters->second.erase(offerFilter);
if (agentFilters->second.empty()) {
roleFilters->second.erase(slaveId);
}
}
}
}
delete offerFilter;
}
void HierarchicalAllocatorProcess::expire(
const FrameworkID& frameworkId,
const string& role,
const SlaveID& slaveId,
OfferFilter* offerFilter)
{
dispatch(
self(),
&Self::_expire,
frameworkId,
role,
slaveId,
offerFilter);
}
void HierarchicalAllocatorProcess::expire(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
InverseOfferFilter* inverseOfferFilter)
{
// The filter might have already been removed (e.g., if the
// framework no longer exists or in
// HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to
// keep the address from getting reused possibly causing premature
// expiration).
//
// Since this is a performance-sensitive piece of code,
// we use find to avoid the doing any redundant lookups.
auto frameworkIterator = frameworks.find(frameworkId);
if (frameworkIterator != frameworks.end()) {
Framework& framework = frameworkIterator->second;
auto filters = framework.inverseOfferFilters.find(slaveId);
if (filters != framework.inverseOfferFilters.end()) {
filters->second.erase(inverseOfferFilter);
if (filters->second.empty()) {
framework.inverseOfferFilters.erase(slaveId);
}
}
}
delete inverseOfferFilter;
}
bool HierarchicalAllocatorProcess::isWhitelisted(
const SlaveID& slaveId) const
{
CHECK(slaves.contains(slaveId));
const Slave& slave = slaves.at(slaveId);
return whitelist.isNone() || whitelist->contains(slave.hostname);
}
bool HierarchicalAllocatorProcess::isFiltered(
const FrameworkID& frameworkId,
const string& role,
const SlaveID& slaveId,
const Resources& resources) const
{
CHECK(frameworks.contains(frameworkId));
CHECK(slaves.contains(slaveId));
const Framework& framework = frameworks.at(frameworkId);
const Slave& slave = slaves.at(slaveId);
// TODO(mpark): Consider moving these filter logic out and into the master,
// since they are not specific to the hierarchical allocator but rather are
// global allocation constraints.
// Prevent offers from non-MULTI_ROLE agents to be allocated
// to MULTI_ROLE frameworks.
if (framework.capabilities.multiRole &&
!slave.capabilities.multiRole) {
LOG(WARNING) << "Implicitly filtering agent " << slaveId
<< " from framework " << frameworkId
<< " because the framework is MULTI_ROLE capable"
<< " but the agent is not";
return true;
}
// Prevent offers from non-HIERARCHICAL_ROLE agents to be allocated
// to hierarchical roles.
if (!slave.capabilities.hierarchicalRole && strings::contains(role, "/")) {
LOG(WARNING) << "Implicitly filtering agent " << slaveId << " from role "
<< role << " because the role is hierarchical but the agent"
<< " is not HIERARCHICAL_ROLE capable";
return true;
}
// Since this is a performance-sensitive piece of code,
// we use find to avoid the doing any redundant lookups.
auto roleFilters = framework.offerFilters.find(role);
if (roleFilters == framework.offerFilters.end()) {
return false;
}
auto agentFilters = roleFilters->second.find(slaveId);
if (agentFilters == roleFilters->second.end()) {
return false;
}
foreach (OfferFilter* offerFilter, agentFilters->second) {
if (offerFilter->filter(resources)) {
VLOG(1) << "Filtered offer with " << resources
<< " on agent " << slaveId
<< " for role " << role
<< " of framework " << frameworkId;
return true;
}
}
return false;
}
bool HierarchicalAllocatorProcess::isFiltered(
const FrameworkID& frameworkId,
const SlaveID& slaveId) const
{
CHECK(frameworks.contains(frameworkId));
CHECK(slaves.contains(slaveId));
const Framework& framework = frameworks.at(frameworkId);
if (framework.inverseOfferFilters.contains(slaveId)) {
foreach (InverseOfferFilter* inverseOfferFilter,
framework.inverseOfferFilters.at(slaveId)) {
if (inverseOfferFilter->filter()) {
VLOG(1) << "Filtered unavailability on agent " << slaveId
<< " for framework " << frameworkId;
return true;
}
}
}
return false;
}
bool HierarchicalAllocatorProcess::allocatable(const Resources& resources)
{
if (minAllocatableResources.isNone() ||
CHECK_NOTNONE(minAllocatableResources).empty()) {
return true;
}
foreach (
const ResourceQuantities& resourceQuantities,
CHECK_NOTNONE(minAllocatableResources)) {
if (resources.contains(resourceQuantities)) {
return true;
}
}
return false;
}
double HierarchicalAllocatorProcess::_resources_offered_or_allocated(
const string& resource)
{
double offered_or_allocated = 0;
foreachvalue (const Slave& slave, slaves) {
Option<Value::Scalar> value =
slave.getAllocated().get<Value::Scalar>(resource);
if (value.isSome()) {
offered_or_allocated += value->value();
}
}
return offered_or_allocated;
}
double HierarchicalAllocatorProcess::_resources_total(
const string& resource)
{
Option<Value::Scalar> total =
roleSorter->totalScalarQuantities()
.get<Value::Scalar>(resource);
return total.isSome() ? total->value() : 0;
}
double HierarchicalAllocatorProcess::_quota_allocated(
const string& role,
const string& resource)
{
if (!roleSorter->contains(role)) {
// This can occur when execution of this callback races with removal of the
// metric for a role which does not have any associated frameworks.
return 0.;
}
Option<Value::Scalar> used =
roleSorter->allocationScalarQuantities(role)
.get<Value::Scalar>(resource);
return used.isSome() ? used->value() : 0;
}
double HierarchicalAllocatorProcess::_offer_filters_active(
const string& role)
{
double result = 0;
foreachvalue (const Framework& framework, frameworks) {
if (!framework.offerFilters.contains(role)) {
continue;
}
foreachkey (const SlaveID& slaveId, framework.offerFilters.at(role)) {
result += framework.offerFilters.at(role).at(slaveId).size();
}
}
return result;
}
bool HierarchicalAllocatorProcess::isFrameworkTrackedUnderRole(
const FrameworkID& frameworkId,
const string& role) const
{
return roles.contains(role) &&
roles.at(role).contains(frameworkId);
}
void HierarchicalAllocatorProcess::trackFrameworkUnderRole(
const FrameworkID& frameworkId,
const string& role)
{
CHECK(initialized);
// If this is the first framework to subscribe to this role, or have
// resources allocated to this role, initialize state as necessary.
if (!roles.contains(role)) {
roles[role] = {};
CHECK(!roleSorter->contains(role));
roleSorter->add(role);
roleSorter->activate(role);
CHECK(!frameworkSorters.contains(role));
frameworkSorters.insert({role, Owned<Sorter>(frameworkSorterFactory())});
frameworkSorters.at(role)->initialize(fairnessExcludeResourceNames);
metrics.addRole(role);
}
CHECK(!roles.at(role).contains(frameworkId));
roles.at(role).insert(frameworkId);
CHECK(!frameworkSorters.at(role)->contains(frameworkId.value()));
frameworkSorters.at(role)->add(frameworkId.value());
}
void HierarchicalAllocatorProcess::untrackFrameworkUnderRole(
const FrameworkID& frameworkId,
const string& role)
{
CHECK(initialized);
CHECK(roles.contains(role));
CHECK(roles.at(role).contains(frameworkId));
CHECK(frameworkSorters.contains(role));
CHECK(frameworkSorters.at(role)->contains(frameworkId.value()));
roles.at(role).erase(frameworkId);
frameworkSorters.at(role)->remove(frameworkId.value());
// If no more frameworks are subscribed to this role or have resources
// allocated to this role, cleanup associated state. This is not necessary
// for correctness (roles with no registered frameworks will not be offered
// any resources), but since many different role names might be used over
// time, we want to avoid leaking resources for no-longer-used role names.
// Note that we don't remove the role from `quotaRoleSorter` if it exists
// there, since roles with a quota set still influence allocation even if
// they don't have any registered frameworks.
if (roles.at(role).empty()) {
CHECK_EQ(frameworkSorters.at(role)->count(), 0);
roles.erase(role);
roleSorter->remove(role);
frameworkSorters.erase(role);
metrics.removeRole(role);
}
}
void HierarchicalAllocatorProcess::trackReservations(
const hashmap<std::string, Resources>& reservations)
{
foreachpair (const string& role,
const Resources& resources, reservations) {
const Resources scalarQuantitesToTrack =
resources.createStrippedScalarQuantity();
reservationScalarQuantities[role] += scalarQuantitesToTrack;
}
}
void HierarchicalAllocatorProcess::untrackReservations(
const hashmap<std::string, Resources>& reservations)
{
foreachpair (const string& role,
const Resources& resources, reservations) {
CHECK(reservationScalarQuantities.contains(role));
Resources& currentReservationQuantity =
reservationScalarQuantities.at(role);
const Resources scalarQuantitesToUntrack =
resources.createStrippedScalarQuantity();
CHECK(currentReservationQuantity.contains(scalarQuantitesToUntrack));
currentReservationQuantity -= scalarQuantitesToUntrack;
if (currentReservationQuantity.empty()) {
reservationScalarQuantities.erase(role);
}
}
}
bool HierarchicalAllocatorProcess::updateSlaveTotal(
const SlaveID& slaveId,
const Resources& total)
{
CHECK(slaves.contains(slaveId));
Slave& slave = slaves.at(slaveId);
const Resources oldTotal = slave.getTotal();
if (oldTotal == total) {
return false;
}
slave.updateTotal(total);
hashmap<std::string, Resources> oldReservations = oldTotal.reservations();
hashmap<std::string, Resources> newReservations = total.reservations();
if (oldReservations != newReservations) {
untrackReservations(oldReservations);
trackReservations(newReservations);
}
// Currently `roleSorter` and `quotaRoleSorter`, being the root-level
// sorters, maintain all of `slaves[slaveId].total` (or the `nonRevocable()`
// portion in the case of `quotaRoleSorter`) in their own totals (which
// don't get updated in the allocation runs or during recovery of allocated
// resources). So, we update them using the resources in `slave.total`.
roleSorter->remove(slaveId, oldTotal);
roleSorter->add(slaveId, total);
// See comment at `quotaRoleSorter` declaration regarding non-revocable.
quotaRoleSorter->remove(slaveId, oldTotal.nonRevocable());
quotaRoleSorter->add(slaveId, total.nonRevocable());
return true;
}
bool HierarchicalAllocatorProcess::isRemoteSlave(const Slave& slave) const
{
// If the slave does not have a configured domain, assume it is not remote.
if (slave.domain.isNone()) {
return false;
}
// The current version of the Mesos agent refuses to startup if a
// domain is specified without also including a fault domain. That
// might change in the future, if more types of domains are added.
// For forward compatibility, we treat agents with a configured
// domain but no fault domain as having no configured domain.
if (!slave.domain->has_fault_domain()) {
return false;
}
// If the slave has a configured domain (and it has been allowed to
// register with the master), the master must also have a configured
// domain.
CHECK(domain.isSome());
// The master will not startup if configured with a domain but no
// fault domain.
CHECK(domain->has_fault_domain());
const DomainInfo::FaultDomain::RegionInfo& masterRegion =
domain->fault_domain().region();
const DomainInfo::FaultDomain::RegionInfo& slaveRegion =
slave.domain->fault_domain().region();
return masterRegion != slaveRegion;
}
void HierarchicalAllocatorProcess::trackAllocatedResources(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const Resources& allocated)
{
CHECK(slaves.contains(slaveId));
CHECK(frameworks.contains(frameworkId));
// TODO(bmahler): Calling allocations() is expensive since it has
// to construct a map. Avoid this.
foreachpair (const string& role,
const Resources& allocation,
allocated.allocations()) {
// The framework has resources allocated to this role but it may
// or may not be subscribed to the role. Either way, we need to
// track the framework under the role.
if (!isFrameworkTrackedUnderRole(frameworkId, role)) {
trackFrameworkUnderRole(frameworkId, role);
}
CHECK(roleSorter->contains(role));
CHECK(frameworkSorters.contains(role));
CHECK(frameworkSorters.at(role)->contains(frameworkId.value()));
roleSorter->allocated(role, slaveId, allocation);
frameworkSorters.at(role)->add(slaveId, allocation);
frameworkSorters.at(role)->allocated(
frameworkId.value(), slaveId, allocation);
if (quotas.contains(role)) {
// See comment at `quotaRoleSorter` declaration regarding non-revocable.
quotaRoleSorter->allocated(role, slaveId, allocation.nonRevocable());
}
}
}
void HierarchicalAllocatorProcess::untrackAllocatedResources(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const Resources& allocated)
{
// TODO(mzhu): Add a `CHECK(slaves.contains(slaveId));`
// here once MESOS-621 is resolved. Ideally, `removeSlave()`
// should unallocate resources in the framework sorters.
// But currently, a slave is removed first via `removeSlave()`
// and later a call to `recoverResources()` occurs to recover
// the framework's resources.
CHECK(frameworks.contains(frameworkId));
// TODO(bmahler): Calling allocations() is expensive since it has
// to construct a map. Avoid this.
foreachpair (const string& role,
const Resources& allocation,
allocated.allocations()) {
CHECK(roleSorter->contains(role));
CHECK(frameworkSorters.contains(role));
CHECK(frameworkSorters.at(role)->contains(frameworkId.value()));
frameworkSorters.at(role)->unallocated(
frameworkId.value(), slaveId, allocation);
frameworkSorters.at(role)->remove(slaveId, allocation);
roleSorter->unallocated(role, slaveId, allocation);
if (quotas.contains(role)) {
// See comment at `quotaRoleSorter` declaration regarding non-revocable.
quotaRoleSorter->unallocated(role, slaveId, allocation.nonRevocable());
}
}
}
} // namespace internal {
} // namespace allocator {
} // namespace master {
} // namespace internal {
} // namespace mesos {