Fixed performance of tracking resource totals in allocator's roles tree.
Before this patch, the roles tree was tracking total resources
offered/allocated to a role as a single `Resources` objects.
In the case when each agent has a limited number of unique resources
(for example, a single persistent voulme), this resulted in poor
asymptotic complexity of allocation versus the number of agents
(O(N^2)) that was clearly observable in
`HierarchicalAllocations_BENCHMARK_Test.PersistentVolumes`.
In addition, the role tree code was violating the convention that
`Resources` belonging to different agents should never be added.
This patch implements per-agent tracking of `Resources` in the roles
tree, thus improving the performance of allocation (and getting rid of
the potentially problematic O(N^2) asymptotic) in the case of many
agents with a limited number of unique resources each.
Review: https://reviews.apache.org/r/72508
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index 5fe9ffc..9e50799 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -202,6 +202,46 @@
}
+void ScalarResourceTotals::add(
+ const SlaveID& slaveID,
+ const Resources& resources)
+{
+ if (resources.scalars().empty()) {
+ // In this case, we avoid adding an entry to `scalars` to maintain the
+ // invariant that `scalars` doesn't track agents with empty resources.
+ return;
+ }
+
+ scalarsTotal -= ResourceQuantities::fromScalarResources(scalars[slaveID]);
+ scalars.at(slaveID) += resources.scalars();
+ scalarsTotal += ResourceQuantities::fromScalarResources(scalars.at(slaveID));
+}
+
+
+void ScalarResourceTotals::subtract(
+ const SlaveID& slaveID,
+ const Resources& resources)
+{
+ if (resources.scalars().empty()) {
+ // `scalars` does not track agents with empty resources, thus subtracting
+ // empty resources from an agent is valid regardless of whether its
+ // resources are tracked in `scalars`.
+ return;
+ }
+
+ CHECK_CONTAINS(scalars, slaveID);
+ CHECK_CONTAINS(scalars.at(slaveID), resources.scalars());
+
+ scalarsTotal -= ResourceQuantities::fromScalarResources(scalars.at(slaveID));
+ scalars.at(slaveID) -= resources.scalars();
+ scalarsTotal += ResourceQuantities::fromScalarResources(scalars.at(slaveID));
+
+ if (scalars.at(slaveID).empty()) {
+ scalars.erase(slaveID);
+ }
+}
+
+
Role::Role(const string& _role, Role* _parent)
: role(_role),
basename(strings::split(role, "/").back()),
@@ -315,10 +355,10 @@
break;
}
- CHECK(current->allocatedScalars_.empty())
+ CHECK(current->allocatedUnreservedNonRevocable.empty())
<< "An empty role " << current->role
<< " has non-empty allocated scalar resources: "
- << current->allocatedScalars_;
+ << current->allocatedUnreservedNonRevocable.quantities();
Role* parent = CHECK_NOTNULL(current->parent);
@@ -328,9 +368,17 @@
(*metrics)->removeRole(current->role);
}
- CHECK(current->offeredOrAllocatedScalars_.empty())
- << " role: " << role
- << " offeredOrAllocated: " << current->offeredOrAllocatedScalars_;
+ CHECK(current->offeredOrAllocatedUnreservedNonRevocable.empty())
+ << "An empty role " << current->role
+ << " has non-empty offered or allocated"
+ << " unreserved non-revocable scalar resources: "
+ << current->offeredOrAllocatedUnreservedNonRevocable.quantities();
+
+ CHECK(current->offeredOrAllocatedReserved.empty())
+ << "An empty role " << current->role
+ << " has non-empty offered or allocated reserved scalar resources: "
+ << current->offeredOrAllocatedReserved.quantities();
+
roles_.erase(current->role);
current = parent;
@@ -385,28 +433,32 @@
}
-void RoleTree::trackAllocated(const Resources& resources_)
+void RoleTree::trackAllocated(
+ const SlaveID& slaveId,
+ const Resources& resources_)
{
foreachpair (
const string& role,
const Resources& resources,
- resources_.scalars().allocations()) {
+ resources_.scalars().unreserved().nonRevocable().allocations()) {
applyToRoleAndAncestors(CHECK_NOTNONE(get_(role)), [&](Role* current) {
- current->allocatedScalars_ += resources;
+ current->allocatedUnreservedNonRevocable.add(slaveId, resources);
updateQuotaConsumedMetric(current);
});
}
}
-void RoleTree::untrackAllocated(const Resources& resources_)
+void RoleTree::untrackAllocated(
+ const SlaveID& slaveId,
+ const Resources& resources_)
{
foreachpair (
const string& role,
const Resources& resources,
- resources_.scalars().allocations()) {
+ resources_.scalars().unreserved().nonRevocable().allocations()) {
applyToRoleAndAncestors(CHECK_NOTNONE(get_(role)), [&](Role* current) {
- current->allocatedScalars_ -= resources;
+ current->allocatedUnreservedNonRevocable.subtract(slaveId, resources);
updateQuotaConsumedMetric(current);
});
}
@@ -453,7 +505,9 @@
}
-void RoleTree::trackOfferedOrAllocated(const Resources& resources_)
+void RoleTree::trackOfferedOrAllocated(
+ const SlaveID& slaveId,
+ const Resources& resources_)
{
// TODO(mzhu): avoid building a map by traversing `resources`
// and look for the allocation role of individual resource.
@@ -464,14 +518,20 @@
const Resources& resources,
resources_.scalars().allocations()) {
applyToRoleAndAncestors(
- CHECK_NOTNONE(get_(role)), [&resources](Role* current) {
- current->offeredOrAllocatedScalars_ += resources;
- });
+ CHECK_NOTNONE(get_(role)), [&resources, &slaveId](Role* current) {
+ current->offeredOrAllocatedReserved.add(
+ slaveId, resources.reserved());
+
+ current->offeredOrAllocatedUnreservedNonRevocable.add(
+ slaveId, resources.unreserved().nonRevocable());
+ });
}
}
-void RoleTree::untrackOfferedOrAllocated(const Resources& resources_)
+void RoleTree::untrackOfferedOrAllocated(
+ const SlaveID& slaveId,
+ const Resources& resources_)
{
// TODO(mzhu): avoid building a map by traversing `resources`
// and look for the allocation role of individual resource.
@@ -482,12 +542,13 @@
const Resources& resources,
resources_.scalars().allocations()) {
applyToRoleAndAncestors(
- CHECK_NOTNONE(get_(role)), [&resources](Role* current) {
- CHECK_CONTAINS(current->offeredOrAllocatedScalars_, resources)
- << " Role: " << current->role
- << " offeredOrAllocated: " << current->offeredOrAllocatedScalars_;
- current->offeredOrAllocatedScalars_ -= resources;
- });
+ CHECK_NOTNONE(get_(role)), [&resources, &slaveId](Role* current) {
+ current->offeredOrAllocatedReserved.subtract(
+ slaveId, resources.reserved());
+
+ current->offeredOrAllocatedUnreservedNonRevocable.subtract(
+ slaveId, resources.unreserved().nonRevocable());
+ });
}
}
@@ -503,8 +564,14 @@
writer->field("limits", role->quota_.limits);
writer->field(
"reservation_quantities", role->reservationScalarQuantities_);
+
writer->field(
- "offered_or_allocated_scalars", role->offeredOrAllocatedScalars_);
+ "offered_or_allocated_reserved_quantities",
+ role->offeredOrAllocatedReserved.quantities());
+
+ writer->field(
+ "offered_or_allocated_unreserved_nonrevocable_quantities",
+ role->offeredOrAllocatedUnreservedNonRevocable.quantities());
writer->field("frameworks", [&](JSON::ArrayWriter* writer) {
foreach (const FrameworkID& id, role->frameworks_) {
@@ -694,7 +761,7 @@
// resources, so we only need to track them in the sorters.
trackAllocatedResources(slaveId, frameworkId, resources);
- roleTree.trackAllocated(resources);
+ roleTree.trackAllocated(slaveId, resources);
}
LOG(INFO) << "Added framework " << frameworkId;
@@ -935,7 +1002,7 @@
trackAllocatedResources(slaveId, frameworkId, allocation);
- roleTree.trackAllocated(allocation);
+ roleTree.trackAllocated(slaveId, allocation);
}
// If we have just a number of recovered agents, we cannot distinguish
@@ -975,7 +1042,7 @@
// untrackAllocatedResources() potentially removes allocation roles, thus
// we need to untrack actually allocated resources in the roles tree first.
- roleTree.untrackAllocated(slave.totalAllocated);
+ roleTree.untrackAllocated(slaveId, slave.totalAllocated);
// Untrack resources in roleTree and sorter.
foreachpair (
@@ -1233,8 +1300,8 @@
slave.increaseAvailable(frameworkId, offeredResources);
slave.decreaseAvailable(frameworkId, updatedOfferedResources);
- roleTree.untrackOfferedOrAllocated(offeredResources);
- roleTree.trackOfferedOrAllocated(updatedOfferedResources);
+ roleTree.untrackOfferedOrAllocated(slaveId, offeredResources);
+ roleTree.trackOfferedOrAllocated(slaveId, updatedOfferedResources);
// Update the allocation in the framework sorter.
frameworkSorter->update(
@@ -1512,7 +1579,7 @@
const Resources& resources)
{
CHECK_NOTNONE(getSlave(slaveId))->totalAllocated += resources;
- roleTree.trackAllocated(resources);
+ roleTree.trackAllocated(slaveId, resources);
}
@@ -1534,7 +1601,7 @@
if (isAllocated && slave.isSome()) {
CHECK_CONTAINS((*slave)->totalAllocated, resources);
(*slave)->totalAllocated -= resources;
- roleTree.untrackAllocated(resources);
+ roleTree.untrackAllocated(slaveId, resources);
}
Option<Framework*> framework = getFramework(frameworkId);
@@ -1939,10 +2006,7 @@
// these as metrics.
if (r->quota() != DEFAULT_QUOTA) {
logHeadroomInfo = true;
- rolesConsumedQuota[r->role] +=
- r->reservationScalarQuantities() +
- ResourceQuantities::fromScalarResources(
- r->offeredOrAllocatedScalars().unreserved().nonRevocable());
+ rolesConsumedQuota[r->role] += r->quotaOfferedOrConsumed();
}
}
@@ -1988,8 +2052,7 @@
// unallocated reservations = total reservations - allocated reservations
availableHeadroom -=
roleTree.root()->reservationScalarQuantities() -
- ResourceQuantities::fromScalarResources(
- roleTree.root()->offeredOrAllocatedScalars().reserved());
+ roleTree.root()->offeredOrAllocatedReservedScalarQuantities();
// Subtract revocable resources.
foreachvalue (const Slave& slave, slaves) {
@@ -3066,7 +3129,7 @@
CHECK_CONTAINS(*frameworkSorter, frameworkId.value())
<< " for role " << role;
- roleTree.trackOfferedOrAllocated(allocation);
+ roleTree.trackOfferedOrAllocated(slaveId, allocation);
roleSorter->allocated(role, slaveId, allocation);
frameworkSorter->allocated(
@@ -3100,7 +3163,7 @@
CHECK_CONTAINS(*frameworkSorter, frameworkId.value())
<< "for role " << role;
- roleTree.untrackOfferedOrAllocated(allocation);
+ roleTree.untrackOfferedOrAllocated(slaveId, allocation);
frameworkSorter->unallocated(frameworkId.value(), slaveId, allocation);
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 6454cda..e444e47 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -112,6 +112,28 @@
};
+// Helper for tracking cross-agent scalar resource totals.
+// Needed because directly summing Resources across agents has
+// prohibitively expensive time complexity: O(N^2) vs the number of agents,
+// and also violates the convention that Resources belonging to different agents
+// should not be added.
+class ScalarResourceTotals
+{
+public:
+ // These methods implicitly filter out non-scalars from the inputs, thus
+ // the caller is not obliged to ensure that `resources` contains only scalars.
+ void add(const SlaveID& slaveID, const Resources& resources);
+ void subtract(const SlaveID& slaveID, const Resources& resources);
+
+ bool empty() const { return scalars.empty(); }
+ ResourceQuantities quantities() const { return scalarsTotal; }
+
+private:
+ hashmap<SlaveID, Resources> scalars;
+ ResourceQuantities scalarsTotal;
+};
+
+
class Role
{
public:
@@ -122,19 +144,24 @@
return reservationScalarQuantities_;
}
- const Resources& offeredOrAllocatedScalars() const
+ ResourceQuantities offeredOrAllocatedReservedScalarQuantities() const
{
- return offeredOrAllocatedScalars_;
+ return offeredOrAllocatedReserved.quantities();
}
const hashset<FrameworkID>& frameworks() const { return frameworks_; }
const Quota& quota() const { return quota_; }
+ ResourceQuantities quotaOfferedOrConsumed() const
+ {
+ return offeredOrAllocatedUnreservedNonRevocable.quantities() +
+ reservationScalarQuantities_;
+ }
+
ResourceQuantities quotaConsumed() const
{
- return ResourceQuantities::fromScalarResources(
- allocatedScalars_.unreserved().nonRevocable()) +
+ return allocatedUnreservedNonRevocable.quantities() +
reservationScalarQuantities_;
}
@@ -187,12 +214,13 @@
// resources offered or allocated to this role.
hashset<FrameworkID> frameworks_;
- // Total allocated or offered scalar resources to this role, including
- // meta data. This field dose not affect role's lifecycle. However, since
- // any offered or allocated resources should be tied to a framework,
- // an empty role (that has no registered framework) must have
- // empty offeredOrAllocated resources.
- Resources offeredOrAllocatedScalars_;
+ // Totals tracker for unreserved non-revocable offered/allocated resources.
+ // Note that since any offered or allocated resources should be tied to
+ // a framework, an empty role (that has no registered framework) must have
+ // this total empty.
+ ScalarResourceTotals offeredOrAllocatedUnreservedNonRevocable;
+
+ ScalarResourceTotals offeredOrAllocatedReserved;
// Aggregated reserved scalar resource quantities on all agents tied to this
// role, if any. This includes both its own reservations as well as
@@ -200,9 +228,9 @@
// Note that non-scalar resources, such as ports, are excluded.
ResourceQuantities reservationScalarQuantities_;
- // Scalar resources actually allocated (i.e. used for launching tasks) to this
- // role and any of its subroles, both reserved and unreserved, on all agents.
- Resources allocatedScalars_;
+ // Totals tracker for unreserved non-revocable resources actually allocated
+ // (i.e. used for launching tasks) to this role and any of its subroles.
+ ScalarResourceTotals allocatedUnreservedNonRevocable;
hashmap<std::string, Role*> children_;
};
@@ -240,9 +268,9 @@
void trackReservations(const Resources& resources);
void untrackReservations(const Resources& resources);
- // We keep track of allocated resources which are actially used by frameworks.
- void trackAllocated(const Resources& resources);
- void untrackAllocated(const Resources& resources);
+ // We keep track of allocated resources which are actually used by frameworks.
+ void trackAllocated(const SlaveID& slaveId, const Resources& resources);
+ void untrackAllocated(const SlaveID& slaveId, const Resources& resources);
void trackFramework(
const FrameworkID& frameworkId, const std::string& role);
@@ -253,8 +281,13 @@
void updateWeight(const std::string& role, double weight);
- void trackOfferedOrAllocated(const Resources& resources);
- void untrackOfferedOrAllocated(const Resources& resources);
+ void trackOfferedOrAllocated(
+ const SlaveID& slaveId,
+ const Resources& resources);
+
+ void untrackOfferedOrAllocated(
+ const SlaveID& slaveId,
+ const Resources& resources);
// Dump the role tree state in JSON format for debugging.
std::string toJSON() const;