| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include <atomic> |
| #include <iostream> |
| #include <set> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include <gmock/gmock.h> |
| |
| #include <mesos/allocator/allocator.hpp> |
| |
| #include <process/clock.hpp> |
| #include <process/future.hpp> |
| #include <process/gtest.hpp> |
| #include <process/queue.hpp> |
| |
| #include <stout/duration.hpp> |
| #include <stout/foreach.hpp> |
| #include <stout/gtest.hpp> |
| #include <stout/hashmap.hpp> |
| #include <stout/hashset.hpp> |
| #include <stout/json.hpp> |
| #include <stout/os.hpp> |
| #include <stout/stopwatch.hpp> |
| #include <stout/utils.hpp> |
| |
| #include "master/constants.hpp" |
| #include "master/flags.hpp" |
| |
| #include "master/allocator/mesos/hierarchical.hpp" |
| |
| #include "tests/allocator.hpp" |
| #include "tests/mesos.hpp" |
| #include "tests/utils.hpp" |
| |
| using mesos::internal::master::MIN_CPUS; |
| using mesos::internal::master::MIN_MEM; |
| |
| using mesos::internal::master::allocator::HierarchicalDRFAllocator; |
| |
| using mesos::internal::protobuf::createLabel; |
| |
| using mesos::allocator::Allocator; |
| |
| using process::Clock; |
| using process::Future; |
| |
| using std::atomic; |
| using std::cout; |
| using std::endl; |
| using std::map; |
| using std::set; |
| using std::string; |
| using std::vector; |
| |
| using testing::WithParamInterface; |
| |
| namespace mesos { |
| namespace internal { |
| namespace tests { |
| |
| |
| struct Allocation |
| { |
| FrameworkID frameworkId; |
| hashmap<SlaveID, Resources> resources; |
| }; |
| |
| |
| // Creates a "ports(*)" resource for the given ranges. |
| static Resource createPorts(const ::mesos::Value::Ranges& ranges) |
| { |
| Value value; |
| value.set_type(Value::RANGES); |
| value.mutable_ranges()->CopyFrom(ranges); |
| |
| Resource resource; |
| resource.set_role("*"); |
| resource.set_name("ports"); |
| resource.set_type(Value::RANGES); |
| resource.mutable_ranges()->CopyFrom(value.ranges()); |
| |
| return resource; |
| } |
| |
| |
| // Fragments the given range bounds into a number of subranges. |
| // Returns an Error if 'numRanges' is too large. E.g. |
| // |
| // [1-10], 1 -> [1-10] |
| // [1-10], 2 -> [1-1,3-10] |
| // [1-10], 3 -> [1-1,3-3,5-10] |
| // [1-10], 4 -> [1-1,3-3,5-5,7-10] |
| // [1-10], 5 -> [1-1,3-3,5-5,7-7,9-10] |
| // [1-10], 6 -> Error |
| // |
| static Try<::mesos::Value::Ranges> fragment( |
| const ::mesos::Value::Range& bounds, |
| size_t numRanges) |
| { |
| uint64_t numValues = bounds.end() - bounds.begin() + 1; |
| |
| // Compute the max number of ranges. |
| // |
| // If `numValues` is even, then the maximum number of ranges is |
| // `numValues / 2`: |
| // [1-2] -> 2 values, maximum 1 range: [1-2] |
| // [1-4] -> 4 values, maximum 2 ranges: [1-1,3-4] |
| // [1-6] -> 6 values, maximum 3 ranges: [1-1,3-3,5-6] |
| // |
| // If `numValues` is odd, then the maximum number of ranges is |
| // `(numValues + 1) / 2`: |
| // [1-1] -> 1 values, maximum 1 range: [1-1] |
| // [1-3] -> 3 values, maximum 2 ranges: [1-1,3-3] |
| // [1-5] -> 5 values, maximum 3 ranges: [1-1,3-3,5-5] |
| // |
| uint64_t maxRanges; |
| if (numValues % 2 == 0) { |
| maxRanges = numValues / 2; |
| } else { |
| maxRanges = (numValues + 1) / 2; |
| } |
| |
| if (numRanges > maxRanges) { |
| return Error("Requested more distinct ranges than possible"); |
| } |
| |
| // See the documentation above for the fragmentation technique. |
| // We fragment from the front of the bounds until we have the |
| // desired number of ranges. |
| ::mesos::Value::Ranges ranges; |
| ranges.mutable_range()->Reserve(numRanges); |
| |
| for (size_t i = 0; i < numRanges; ++i) { |
| Value::Range* range = ranges.add_range(); |
| |
| range->set_begin(bounds.begin() + (i * 2)); |
| range->set_end(range->begin()); |
| } |
| |
| // Make sure the last range covers the end of the bounds. |
| if (!ranges.range().empty()) { |
| ranges.mutable_range()->rbegin()->set_end(bounds.end()); |
| } |
| |
| return ranges; |
| } |
| |
| |
| static ::mesos::Value::Range createRange(uint64_t begin, uint64_t end) |
| { |
| ::mesos::Value::Range range; |
| range.set_begin(begin); |
| range.set_end(end); |
| return range; |
| } |
| |
| |
| struct Deallocation |
| { |
| FrameworkID frameworkId; |
| hashmap<SlaveID, UnavailableResources> resources; |
| }; |
| |
| |
| class HierarchicalAllocatorTestBase : public ::testing::Test |
| { |
| protected: |
| HierarchicalAllocatorTestBase() |
| : allocator(createAllocator<HierarchicalDRFAllocator>()), |
| nextSlaveId(1), |
| nextFrameworkId(1) {} |
| |
| ~HierarchicalAllocatorTestBase() |
| { |
| delete allocator; |
| } |
| |
| void initialize( |
| const master::Flags& _flags = master::Flags(), |
| Option<lambda::function< |
| void(const FrameworkID&, |
| const hashmap<SlaveID, Resources>&)>> offerCallback = None(), |
| Option<lambda::function< |
| void(const FrameworkID&, |
| const hashmap<SlaveID, UnavailableResources>&)>> |
| inverseOfferCallback = None()) |
| { |
| flags = _flags; |
| |
| if (offerCallback.isNone()) { |
| offerCallback = |
| [this](const FrameworkID& frameworkId, |
| const hashmap<SlaveID, Resources>& resources) { |
| Allocation allocation; |
| allocation.frameworkId = frameworkId; |
| allocation.resources = resources; |
| |
| allocations.put(allocation); |
| }; |
| } |
| |
| if (inverseOfferCallback.isNone()) { |
| inverseOfferCallback = |
| [this](const FrameworkID& frameworkId, |
| const hashmap<SlaveID, UnavailableResources>& resources) { |
| Deallocation deallocation; |
| deallocation.frameworkId = frameworkId; |
| deallocation.resources = resources; |
| |
| deallocations.put(deallocation); |
| }; |
| } |
| |
| allocator->initialize( |
| flags.allocation_interval, |
| offerCallback.get(), |
| inverseOfferCallback.get(), |
| {}, |
| flags.fair_sharing_excluded_resource_names); |
| } |
| |
| SlaveInfo createSlaveInfo(const string& resources) |
| { |
| SlaveID slaveId; |
| slaveId.set_value("agent" + stringify(nextSlaveId++)); |
| |
| SlaveInfo slave; |
| *(slave.mutable_resources()) = Resources::parse(resources).get(); |
| *(slave.mutable_id()) = slaveId; |
| slave.set_hostname(slaveId.value()); |
| |
| return slave; |
| } |
| |
| FrameworkInfo createFrameworkInfo( |
| const string& role, |
| const vector<FrameworkInfo::Capability::Type>& capabilities = {}) |
| { |
| FrameworkInfo frameworkInfo; |
| frameworkInfo.set_user("user"); |
| frameworkInfo.set_name("framework" + stringify(nextFrameworkId++)); |
| frameworkInfo.mutable_id()->set_value(frameworkInfo.name()); |
| frameworkInfo.set_role(role); |
| |
| foreach (const FrameworkInfo::Capability::Type& capability, capabilities) { |
| frameworkInfo.add_capabilities()->set_type(capability); |
| } |
| |
| return frameworkInfo; |
| } |
| |
| static Quota createQuota(const string& role, const string& resources) |
| { |
| mesos::quota::QuotaInfo quotaInfo; |
| quotaInfo.set_role(role); |
| quotaInfo.mutable_guarantee()->CopyFrom(Resources::parse(resources).get()); |
| |
| return Quota{quotaInfo}; |
| } |
| |
| Resources createRevocableResources( |
| const string& name, |
| const string& value, |
| const string& role = "*") |
| { |
| Resource resource = Resources::parse(name, value, role).get(); |
| resource.mutable_revocable(); |
| return resource; |
| } |
| |
| static WeightInfo createWeightInfo(const string& role, double weight) |
| { |
| WeightInfo weightInfo; |
| weightInfo.set_role(role); |
| weightInfo.set_weight(weight); |
| |
| return weightInfo; |
| } |
| |
| protected: |
| master::Flags flags; |
| |
| Allocator* allocator; |
| |
| process::Queue<Allocation> allocations; |
| process::Queue<Deallocation> deallocations; |
| |
| private: |
| int nextSlaveId; |
| int nextFrameworkId; |
| }; |
| |
| |
| class HierarchicalAllocatorTest : public HierarchicalAllocatorTestBase {}; |
| |
| |
| // TODO(bmahler): These tests were transformed directly from |
| // integration tests into unit tests. However, these tests |
| // should be simplified even further to each test a single |
| // expected behavior, at which point we can have more tests |
| // that are each very small. |
| |
| |
| // Checks that the DRF allocator implements the DRF algorithm |
| // correctly. The test accomplishes this by adding frameworks and |
| // slaves one at a time to the allocator, making sure that each time |
| // a new slave is added all of its resources are offered to whichever |
| // framework currently has the smallest share. Checking for proper DRF |
| // logic when resources are returned, frameworks exit, etc. is handled |
| // by SorterTest.DRFSorter. |
| TEST_F(HierarchicalAllocatorTest, UnreservedDRF) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| initialize(); |
| |
| // Total cluster resources will become cpus=2, mem=1024. |
| SlaveInfo slave1 = createSlaveInfo("cpus:2;mem:1024;disk:0"); |
| allocator->addSlave(slave1.id(), slave1, None(), slave1.resources(), {}); |
| |
| // framework1 will be offered all of slave1's resources since it is |
| // the only framework running so far. |
| FrameworkInfo framework1 = createFrameworkInfo("role1"); |
| allocator->addFramework(framework1.id(), framework1, {}); |
| |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework1.id(), allocation.get().frameworkId); |
| EXPECT_EQ(slave1.resources(), Resources::sum(allocation.get().resources)); |
| |
| // role1 share = 1 (cpus=2, mem=1024) |
| // framework1 share = 1 |
| |
| FrameworkInfo framework2 = createFrameworkInfo("role2"); |
| allocator->addFramework(framework2.id(), framework2, {}); |
| |
| // Total cluster resources will become cpus=3, mem=1536: |
| // role1 share = 0.66 (cpus=2, mem=1024) |
| // framework1 share = 1 |
| // role2 share = 0 |
| // framework2 share = 0 |
| SlaveInfo slave2 = createSlaveInfo("cpus:1;mem:512;disk:0"); |
| allocator->addSlave(slave2.id(), slave2, None(), slave2.resources(), {}); |
| |
| // framework2 will be offered all of slave2's resources since role2 |
| // has the lowest user share, and framework2 is its only framework. |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework2.id(), allocation.get().frameworkId); |
| EXPECT_EQ(slave2.resources(), Resources::sum(allocation.get().resources)); |
| |
| // role1 share = 0.67 (cpus=2, mem=1024) |
| // framework1 share = 1 |
| // role2 share = 0.33 (cpus=1, mem=512) |
| // framework2 share = 1 |
| |
| // Total cluster resources will become cpus=6, mem=3584: |
| // role1 share = 0.33 (cpus=2, mem=1024) |
| // framework1 share = 1 |
| // role2 share = 0.16 (cpus=1, mem=512) |
| // framework2 share = 1 |
| SlaveInfo slave3 = createSlaveInfo("cpus:3;mem:2048;disk:0"); |
| allocator->addSlave(slave3.id(), slave3, None(), slave3.resources(), {}); |
| |
| // framework2 will be offered all of slave3's resources since role2 |
| // has the lowest share. |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework2.id(), allocation.get().frameworkId); |
| EXPECT_EQ(slave3.resources(), Resources::sum(allocation.get().resources)); |
| |
| // role1 share = 0.33 (cpus=2, mem=1024) |
| // framework1 share = 1 |
| // role2 share = 0.71 (cpus=4, mem=2560) |
| // framework2 share = 1 |
| |
| FrameworkInfo framework3 = createFrameworkInfo("role1"); |
| allocator->addFramework(framework3.id(), framework3, {}); |
| |
| // Total cluster resources will become cpus=10, mem=7680: |
| // role1 share = 0.2 (cpus=2, mem=1024) |
| // framework1 share = 1 |
| // framework3 share = 0 |
| // role2 share = 0.4 (cpus=4, mem=2560) |
| // framework2 share = 1 |
| SlaveInfo slave4 = createSlaveInfo("cpus:4;mem:4096;disk:0"); |
| allocator->addSlave(slave4.id(), slave4, None(), slave4.resources(), {}); |
| |
| // framework3 will be offered all of slave4's resources since role1 |
| // has the lowest user share, and framework3 has the lowest share of |
| // role1's frameworks. |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework3.id(), allocation.get().frameworkId); |
| EXPECT_EQ(slave4.resources(), Resources::sum(allocation.get().resources)); |
| |
| // role1 share = 0.67 (cpus=6, mem=5120) |
| // framework1 share = 0.33 (cpus=2, mem=1024) |
| // framework3 share = 0.8 (cpus=4, mem=4096) |
| // role2 share = 0.4 (cpus=4, mem=2560) |
| // framework2 share = 1 |
| |
| FrameworkInfo framework4 = createFrameworkInfo("role1"); |
| allocator->addFramework(framework4.id(), framework4, {}); |
| |
| // Total cluster resources will become cpus=11, mem=8192 |
| // role1 share = 0.63 (cpus=6, mem=5120) |
| // framework1 share = 0.33 (cpus=2, mem=1024) |
| // framework3 share = 0.8 (cpus=4, mem=4096) |
| // framework4 share = 0 |
| // role2 share = 0.36 (cpus=4, mem=2560) |
| // framework2 share = 1 |
| SlaveInfo slave5 = createSlaveInfo("cpus:1;mem:512;disk:0"); |
| allocator->addSlave(slave5.id(), slave5, None(), slave5.resources(), {}); |
| |
| // Even though framework4 doesn't have any resources, role2 has a |
| // lower share than role1, so framework2 receives slave5's resources. |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework2.id(), allocation.get().frameworkId); |
| EXPECT_EQ(slave5.resources(), Resources::sum(allocation.get().resources)); |
| } |
| |
| |
| // This test ensures that reserved resources do affect the sharing across roles. |
| TEST_F(HierarchicalAllocatorTest, ReservedDRF) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| initialize(); |
| |
| SlaveInfo slave1 = createSlaveInfo( |
| "cpus:1;mem:512;disk:0;" |
| "cpus(role1):100;mem(role1):1024;disk(role1):0"); |
| allocator->addSlave(slave1.id(), slave1, None(), slave1.resources(), {}); |
| |
| // framework1 will be offered all of the resources. |
| FrameworkInfo framework1 = createFrameworkInfo("role1"); |
| allocator->addFramework(framework1.id(), framework1, {}); |
| |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework1.id(), allocation.get().frameworkId); |
| EXPECT_EQ(slave1.resources(), Resources::sum(allocation.get().resources)); |
| |
| FrameworkInfo framework2 = createFrameworkInfo("role2"); |
| allocator->addFramework(framework2.id(), framework2, {}); |
| |
| // framework2 will be allocated the new resources. |
| SlaveInfo slave2 = createSlaveInfo("cpus:2;mem:512;disk:0"); |
| allocator->addSlave(slave2.id(), slave2, None(), slave2.resources(), {}); |
| |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework2.id(), allocation.get().frameworkId); |
| EXPECT_EQ(slave2.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Since `framework1` has more resources allocated to it than `framework2`, |
| // We expect `framework2` to receive this agent's resources. |
| SlaveInfo slave3 = createSlaveInfo("cpus:2;mem:512;disk:0"); |
| allocator->addSlave(slave3.id(), slave3, None(), slave3.resources(), {}); |
| |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework2.id(), allocation.get().frameworkId); |
| EXPECT_EQ(slave3.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Now add another framework in role1. Since the reserved resources |
| // should be allocated fairly between frameworks within a role, we |
| // expect framework3 to receive the next allocation of role1 |
| // resources. |
| FrameworkInfo framework3 = createFrameworkInfo("role1"); |
| allocator->addFramework(framework3.id(), framework3, {}); |
| |
| SlaveInfo slave4 = createSlaveInfo( |
| "cpus(role1):2;mem(role1):1024;disk(role1):0"); |
| allocator->addSlave(slave4.id(), slave4, None(), slave4.resources(), {}); |
| |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework3.id(), allocation.get().frameworkId); |
| EXPECT_EQ(slave4.resources(), Resources::sum(allocation.get().resources)); |
| } |
| |
| |
| // Tests that the fairness exclusion list works as expected. The test |
| // accomplishes this by adding frameworks and slaves one at a time to |
| // the allocator with exclude resources, making sure that each time a |
| // new slave is added all of its resources are offered to whichever |
| // framework currently has the smallest share. Checking for proper DRF |
| // logic when resources are returned, frameworks exit, etc, is handled |
| // by SorterTest.DRFSorter. |
| TEST_F(HierarchicalAllocatorTest, DRFWithFairnessExclusion) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| // Specify that `gpus` should not be fairly shared. |
| master::Flags flags_; |
| flags_.fair_sharing_excluded_resource_names = set<string>({"gpus"}); |
| |
| initialize(flags_); |
| |
| // Total cluster resources will become cpus=2, mem=1024, gpus=1. |
| SlaveInfo agent1 = createSlaveInfo("cpus:2;mem:1024;disk:0;gpus:1"); |
| allocator->addSlave(agent1.id(), agent1, None(), agent1.resources(), {}); |
| |
| // framework1 will be offered all of agent1's resources since it is |
| // the only framework running so far. |
| FrameworkInfo framework1 = createFrameworkInfo( |
| "role1", {FrameworkInfo::Capability::GPU_RESOURCES}); |
| |
| allocator->addFramework(framework1.id(), framework1, {}); |
| |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework1.id(), allocation->frameworkId); |
| EXPECT_EQ(agent1.resources(), Resources::sum(allocation->resources)); |
| |
| // role1 share = 1 (cpus=2, mem=1024, (ignored) gpus=1) |
| // framework1 share = 1 |
| |
| FrameworkInfo framework2 = createFrameworkInfo("role2"); |
| allocator->addFramework(framework2.id(), framework2, {}); |
| |
| // Total cluster resources will become cpus=3, mem=1536, (ignored) gpus=1 |
| // role1 share = 0.66 (cpus=2, mem=1024, (ignored) gpus=1) |
| // framework1 share = 1 |
| // role2 share = 0 |
| // framework2 share = 0 |
| SlaveInfo agent2 = createSlaveInfo("cpus:1;mem:512;disk:0"); |
| allocator->addSlave(agent2.id(), agent2, None(), agent2.resources(), {}); |
| |
| // framework2 will be offered all of agent2's resources since role2 |
| // has the lowest user share, and framework2 is its only framework. |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework2.id(), allocation->frameworkId); |
| EXPECT_EQ(agent2.resources(), Resources::sum(allocation->resources)); |
| |
| // role1 share = 0.67 (cpus=2, mem=1024, (ignored) gpus=1) |
| // framework1 share = 1 |
| // role2 share = 0.33 (cpus=1, mem=512) |
| // framework2 share = 1 |
| |
| // Total cluster resources will become cpus=6, mem=3584, (ignored) gpus=1 |
| // role1 share = 0.33 (cpus=2, mem=1024, (ignored) gpus=1) |
| // framework1 share = 1 |
| // role2 share = 0.16 (cpus=1, mem=512) |
| // framework2 share = 1 |
| SlaveInfo agent3 = createSlaveInfo("cpus:3;mem:2048;disk:0"); |
| allocator->addSlave(agent3.id(), agent3, None(), agent3.resources(), {}); |
| |
| // framework2 will be offered all of agent3's resources since role2 |
| // has the lowest share. |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework2.id(), allocation->frameworkId); |
| EXPECT_EQ(agent3.resources(), Resources::sum(allocation->resources)); |
| |
| // role1 share = 0.33 (cpus=2, mem=1024, (ignored)gpus=1) |
| // framework1 share = 1 |
| // role2 share = 0.71 (cpus=4, mem=2560) |
| // framework2 share = 1 |
| |
| FrameworkInfo framework3 = createFrameworkInfo("role1"); |
| allocator->addFramework(framework3.id(), framework3, {}); |
| |
| // Total cluster resources will become cpus=10, mem=7680, (ignored) gpus=1 |
| // role1 share = 0.2 (cpus=2, mem=1024, (ignored) gpus=1) |
| // framework1 share = 1 |
| // framework3 share = 0 |
| // role2 share = 0.4 (cpus=4, mem=2560) |
| // framework2 share = 1 |
| SlaveInfo agent4 = createSlaveInfo("cpus:4;mem:4096;disk:0"); |
| allocator->addSlave(agent4.id(), agent4, None(), agent4.resources(), {}); |
| |
| // framework3 will be offered all of agent4's resources since role1 |
| // has the lowest user share, and framework3 has the lowest share of |
| // role1's frameworks. |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework3.id(), allocation->frameworkId); |
| EXPECT_EQ(agent4.resources(), Resources::sum(allocation->resources)); |
| |
| // role1 share = 0.67 (cpus=6, mem=5120, (ignored) gpus=1) |
| // framework1 share = 0.33 (cpus=2, mem=1024, (ignored) gpus=1) |
| // framework3 share = 0.8 (cpus=4, mem=4096) |
| // role2 share = 0.4 (cpus=4, mem=2560) |
| // framework2 share = 1 |
| |
| FrameworkInfo framework4 = createFrameworkInfo("role1"); |
| allocator->addFramework(framework4.id(), framework4, {}); |
| |
| // Total cluster resources will become cpus=11, mem=8192, (ignored) gpus=1 |
| // role1 share = 0.63 (cpus=6, mem=5120, (ignored) gpus=1) |
| // framework1 share = 0.33 (cpus=2, mem=1024, (ignored) gpus=1) |
| // framework3 share = 0.8 (cpus=4, mem=4096) |
| // framework4 share = 0 |
| // role2 share = 0.36 (cpus=4, mem=2560) |
| // framework2 share = 1 |
| SlaveInfo agent5 = createSlaveInfo("cpus:1;mem:512;disk:0"); |
| allocator->addSlave(agent5.id(), agent5, None(), agent5.resources(), {}); |
| |
| // Even though framework4 doesn't have any resources, role2 has a |
| // lower share than role1, so framework2 receives agent5's resources. |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework2.id(), allocation->frameworkId); |
| EXPECT_EQ(agent5.resources(), Resources::sum(allocation->resources)); |
| } |
| |
| |
| // This test ensures that an offer filter larger than the |
| // allocation interval effectively filters out resources. |
| TEST_F(HierarchicalAllocatorTest, OfferFilter) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| // We put both frameworks into the same role, but we could also |
| // have had separate roles; this should not influence the test. |
| const string ROLE{"role"}; |
| |
| initialize(); |
| |
| FrameworkInfo framework = createFrameworkInfo(ROLE); |
| allocator->addFramework(framework.id(), framework, {}); |
| |
| SlaveInfo agent = createSlaveInfo("cpus:1;mem:512;disk:0"); |
| allocator->addSlave(agent.id(), agent, None(), agent.resources(), {}); |
| |
| // `framework` will be offered all of `agent` resources |
| // because it is the only framework in the cluster. |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Now `framework` declines the offer and sets a filter |
| // with the duration greater than the allocation interval. |
| Duration filterTimeout = flags.allocation_interval * 2; |
| Filters offerFilter; |
| offerFilter.set_refuse_seconds(filterTimeout.secs()); |
| |
| allocator->recoverResources( |
| framework.id(), |
| agent.id(), |
| allocation.get().resources.get(agent.id()).get(), |
| offerFilter); |
| |
| // Ensure the offer filter timeout is set before advancing the clock. |
| Clock::settle(); |
| |
| JSON::Object metrics = Metrics(); |
| |
| string activeOfferFilters = |
| "allocator/mesos/offer_filters/roles/" + ROLE + "/active"; |
| EXPECT_EQ(1, metrics.values[activeOfferFilters]); |
| |
| // Trigger a batch allocation. |
| Clock::advance(flags.allocation_interval); |
| Clock::settle(); |
| |
| // There should be no allocation due to the offer filter. |
| allocation = allocations.get(); |
| ASSERT_TRUE(allocation.isPending()); |
| |
| // Ensure the offer filter times out (2x the allocation interval) |
| // and the next batch allocation occurs. |
| Clock::advance(flags.allocation_interval); |
| Clock::settle(); |
| |
| // The next batch allocation should offer resources to `framework1`. |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent.resources(), Resources::sum(allocation.get().resources)); |
| |
| metrics = Metrics(); |
| |
| EXPECT_EQ(0, metrics.values[activeOfferFilters]); |
| } |
| |
| |
| // This test ensures that an offer filter is not removed earlier than |
| // the next batch allocation. See MESOS-4302 for more information. |
| // |
| // NOTE: If we update the code to allocate upon resource recovery |
| // (MESOS-3078), this test should still pass in that the small offer |
| // filter timeout should lead to the next allocation for the agent |
| // applying the filter. |
| TEST_F(HierarchicalAllocatorTest, SmallOfferFilterTimeout) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| // We put both frameworks into the same role, but we could also |
| // have had separate roles; this should not influence the test. |
| const string ROLE{"role"}; |
| |
| // Explicitly set the allocation interval to make sure |
| // it is greater than the offer filter timeout. |
| master::Flags flags_; |
| flags_.allocation_interval = Minutes(1); |
| |
| initialize(flags_); |
| |
| FrameworkInfo framework1 = createFrameworkInfo(ROLE); |
| allocator->addFramework(framework1.id(), framework1, {}); |
| |
| FrameworkInfo framework2 = createFrameworkInfo(ROLE); |
| allocator->addFramework(framework2.id(), framework2, {}); |
| |
| SlaveInfo agent1 = createSlaveInfo("cpus:1;mem:512;disk:0"); |
| allocator->addSlave( |
| agent1.id(), |
| agent1, |
| None(), |
| agent1.resources(), |
| {std::make_pair(framework1.id(), agent1.resources())}); |
| |
| // Process all triggered allocation events. |
| // |
| // NOTE: No allocations happen because there are no resources to allocate. |
| Clock::settle(); |
| |
| // Total cluster resources (1 agent): cpus=1, mem=512. |
| // ROLE1 share = 1 (cpus=1, mem=512) |
| // framework1 share = 1 (cpus=1, mem=512) |
| // framework2 share = 0 |
| |
| // Add one more agent with some free resources. |
| SlaveInfo agent2 = createSlaveInfo("cpus:1;mem:512;disk:0"); |
| allocator->addSlave(agent2.id(), agent2, None(), agent2.resources(), {}); |
| |
| // Process the allocation triggered by the agent addition. |
| Clock::settle(); |
| |
| // `framework2` will be offered all of `agent2` resources |
| // because its share (0) is smaller than `framework1`. |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework2.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent2.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Total cluster resources (2 agents): cpus=2, mem=1024. |
| // ROLE1 share = 1 (cpus=2, mem=1024) |
| // framework1 share = 0.5 (cpus=1, mem=512) |
| // framework2 share = 0.5 (cpus=1, mem=512) |
| |
| // Now `framework2` declines the offer and sets a filter |
| // for 1 second, which is less than the allocation interval. |
| Duration filterTimeout = Seconds(1); |
| ASSERT_GT(flags.allocation_interval, filterTimeout); |
| |
| Filters offerFilter; |
| offerFilter.set_refuse_seconds(filterTimeout.secs()); |
| |
| allocator->recoverResources( |
| framework2.id(), |
| agent2.id(), |
| allocation.get().resources.get(agent2.id()).get(), |
| offerFilter); |
| |
| // Total cluster resources (2 agents): cpus=2, mem=1024. |
| // ROLE1 share = 0.5 (cpus=1, mem=512) |
| // framework1 share = 1 (cpus=1, mem=512) |
| // framework2 share = 0 |
| |
| // The offer filter times out. Since the allocator ensures that |
| // offer filters are removed after at least one batch allocation |
| // has occurred, we expect that after the timeout elapses, the |
| // filter will remain active for the next allocation and the |
| // resources are allocated to `framework1`. |
| Clock::advance(filterTimeout); |
| Clock::settle(); |
| |
| // Trigger a batch allocation. |
| Clock::advance(flags.allocation_interval); |
| Clock::settle(); |
| |
| // Since the filter is applied, resources are offered to `framework1` |
| // even though its share is greater than `framework2`. |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework1.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent2.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Total cluster resources (2 agents): cpus=2, mem=1024. |
| // ROLE1 share = 1 (cpus=2, mem=1024) |
| // framework1 share = 1 (cpus=2, mem=1024) |
| // framework2 share = 0 |
| |
| // The filter should be removed now than the batch |
| // allocation has occurred! |
| |
| // Now `framework1` declines the offer. |
| allocator->recoverResources( |
| framework1.id(), |
| agent2.id(), |
| allocation.get().resources.get(agent2.id()).get(), |
| None()); |
| |
| // Total cluster resources (2 agents): cpus=2, mem=1024. |
| // ROLE1 share = 0.5 (cpus=1, mem=512) |
| // framework1 share = 1 (cpus=1, mem=512) |
| // framework2 share = 0 |
| |
| // Trigger a batch allocation. |
| Clock::advance(flags.allocation_interval); |
| |
| // Since the filter is removed, resources are offered to `framework2`. |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework2.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent2.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Total cluster resources (2 agents): cpus=2, mem=1024. |
| // ROLE1 share = 1 (cpus=2, mem=1024) |
| // framework1 share = 0.5 (cpus=1, mem=512) |
| // framework2 share = 0.5 (cpus=1, mem=512) |
| } |
| |
| |
| // This test ensures that agents which are scheduled for maintenance are |
| // properly sent inverse offers after they have accepted or reserved resources. |
| TEST_F(HierarchicalAllocatorTest, MaintenanceInverseOffers) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| initialize(); |
| |
| // Create an agent. |
| SlaveInfo agent = createSlaveInfo("cpus:2;mem:1024;disk:0"); |
| allocator->addSlave(agent.id(), agent, None(), agent.resources(), {}); |
| |
| // This framework will be offered all of the resources. |
| FrameworkInfo framework = createFrameworkInfo("*"); |
| allocator->addFramework(framework.id(), framework, {}); |
| |
| // Check that the resources go to the framework. |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent.resources(), Resources::sum(allocation.get().resources)); |
| |
| const process::Time start = Clock::now() + Seconds(60); |
| |
| // Give the agent some unavailability. |
| allocator->updateUnavailability( |
| agent.id(), |
| protobuf::maintenance::createUnavailability( |
| start)); |
| |
| // Check the resources get inverse offered. |
| Future<Deallocation> deallocation = deallocations.get(); |
| AWAIT_READY(deallocation); |
| EXPECT_EQ(framework.id(), deallocation.get().frameworkId); |
| EXPECT_TRUE(deallocation.get().resources.contains(agent.id())); |
| |
| foreachvalue ( |
| const UnavailableResources& unavailableResources, |
| deallocation.get().resources) { |
| // The resources in the inverse offer are unspecified. |
| // This means everything is being requested back. |
| EXPECT_EQ(Resources(), unavailableResources.resources); |
| |
| EXPECT_EQ( |
| start.duration(), |
| Nanoseconds(unavailableResources.unavailability.start().nanoseconds())); |
| } |
| } |
| |
| |
| // This test ensures that allocation is done per slave. This is done |
| // by having 2 slaves and 2 frameworks and making sure each framework |
| // gets only one slave's resources during an allocation. |
| TEST_F(HierarchicalAllocatorTest, CoarseGrained) |
| { |
| // Pausing the clock ensures that the batch allocation does not |
| // influence this test. |
| Clock::pause(); |
| |
| initialize(); |
| |
| SlaveInfo slave1 = createSlaveInfo("cpus:2;mem:1024;disk:0"); |
| allocator->addSlave(slave1.id(), slave1, None(), slave1.resources(), {}); |
| |
| SlaveInfo slave2 = createSlaveInfo("cpus:2;mem:1024;disk:0"); |
| allocator->addSlave(slave2.id(), slave2, None(), slave2.resources(), {}); |
| |
| // Once framework1 is added, an allocation will occur. Return the |
| // resources so that we can test what happens when there are 2 |
| // frameworks and 2 slaves to consider during allocation. |
| FrameworkInfo framework1 = createFrameworkInfo("role1"); |
| allocator->addFramework(framework1.id(), framework1, {}); |
| |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework1.id(), allocation.get().frameworkId); |
| EXPECT_EQ(slave1.resources() + slave2.resources(), |
| Resources::sum(allocation.get().resources)); |
| |
| allocator->recoverResources( |
| framework1.id(), |
| slave1.id(), |
| allocation.get().resources.get(slave1.id()).get(), |
| None()); |
| allocator->recoverResources( |
| framework1.id(), |
| slave2.id(), |
| allocation.get().resources.get(slave2.id()).get(), |
| None()); |
| |
| // Now add the second framework, we expect there to be 2 subsequent |
| // allocations, each framework being allocated a full slave. |
| FrameworkInfo framework2 = createFrameworkInfo("role2"); |
| allocator->addFramework(framework2.id(), framework2, {}); |
| |
| hashmap<FrameworkID, Allocation> frameworkAllocations; |
| |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| frameworkAllocations[allocation.get().frameworkId] = allocation.get(); |
| |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| frameworkAllocations[allocation.get().frameworkId] = allocation.get(); |
| |
| // NOTE: `slave1` and `slave2` have the same resources, we don't care |
| // which framework received which slave, only that they each received one. |
| ASSERT_TRUE(frameworkAllocations.contains(framework1.id())); |
| ASSERT_EQ(1u, frameworkAllocations[framework1.id()].resources.size()); |
| EXPECT_EQ(slave1.resources(), |
| Resources::sum(frameworkAllocations[framework1.id()].resources)); |
| |
| ASSERT_TRUE(frameworkAllocations.contains(framework2.id())); |
| ASSERT_EQ(1u, frameworkAllocations[framework2.id()].resources.size()); |
| EXPECT_EQ(slave2.resources(), |
| Resources::sum(frameworkAllocations[framework2.id()].resources)); |
| } |
| |
| |
| // This test ensures that frameworks that have the same share get an |
| // equal number of allocations over time (rather than the same |
| // framework getting all the allocations because it's name is |
| // lexicographically ordered first). |
| TEST_F(HierarchicalAllocatorTest, SameShareFairness) |
| { |
| Clock::pause(); |
| |
| initialize(); |
| |
| FrameworkInfo framework1 = createFrameworkInfo("*"); |
| allocator->addFramework(framework1.id(), framework1, {}); |
| |
| FrameworkInfo framework2 = createFrameworkInfo("*"); |
| allocator->addFramework(framework2.id(), framework2, {}); |
| |
| SlaveInfo slave = createSlaveInfo("cpus:2;mem:1024;disk:0"); |
| allocator->addSlave(slave.id(), slave, None(), slave.resources(), {}); |
| |
| // Ensure that the slave's resources are alternated between both |
| // frameworks. |
| hashmap<FrameworkID, size_t> counts; |
| |
| for (int i = 0; i < 10; i++) { |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| counts[allocation.get().frameworkId]++; |
| |
| ASSERT_EQ(1u, allocation.get().resources.size()); |
| EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources)); |
| |
| allocator->recoverResources( |
| allocation.get().frameworkId, |
| slave.id(), |
| allocation.get().resources.get(slave.id()).get(), |
| None()); |
| |
| Clock::advance(flags.allocation_interval); |
| } |
| |
| EXPECT_EQ(5u, counts[framework1.id()]); |
| EXPECT_EQ(5u, counts[framework2.id()]); |
| } |
| |
| |
| // Checks that resources on a slave that are statically reserved to |
| // a role are only offered to frameworks in that role. |
| TEST_F(HierarchicalAllocatorTest, Reservations) |
| { |
| Clock::pause(); |
| |
| initialize(); |
| |
| SlaveInfo slave1 = createSlaveInfo( |
| "cpus(role1):2;mem(role1):1024;disk(role1):0"); |
| allocator->addSlave(slave1.id(), slave1, None(), slave1.resources(), {}); |
| |
| SlaveInfo slave2 = createSlaveInfo( |
| "cpus(role2):2;mem(role2):1024;cpus:1;mem:1024;disk:0"); |
| allocator->addSlave(slave2.id(), slave2, None(), slave2.resources(), {}); |
| |
| // This slave's resources should never be allocated, since there |
| // is no framework for role3. |
| SlaveInfo slave3 = createSlaveInfo( |
| "cpus(role3):1;mem(role3):1024;disk(role3):0"); |
| allocator->addSlave(slave3.id(), slave3, None(), slave3.resources(), {}); |
| |
| // framework1 should get all the resources from slave1, and the |
| // unreserved resources from slave2. |
| FrameworkInfo framework1 = createFrameworkInfo("role1"); |
| allocator->addFramework(framework1.id(), framework1, {}); |
| |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework1.id(), allocation.get().frameworkId); |
| EXPECT_EQ(2u, allocation.get().resources.size()); |
| EXPECT_TRUE(allocation.get().resources.contains(slave1.id())); |
| EXPECT_TRUE(allocation.get().resources.contains(slave2.id())); |
| EXPECT_EQ(slave1.resources() + Resources(slave2.resources()).unreserved(), |
| Resources::sum(allocation.get().resources)); |
| |
| // framework2 should get all of its reserved resources on slave2. |
| FrameworkInfo framework2 = createFrameworkInfo("role2"); |
| allocator->addFramework(framework2.id(), framework2, {}); |
| |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework2.id(), allocation.get().frameworkId); |
| EXPECT_EQ(1u, allocation.get().resources.size()); |
| EXPECT_TRUE(allocation.get().resources.contains(slave2.id())); |
| EXPECT_EQ(Resources(slave2.resources()).reserved("role2"), |
| Resources::sum(allocation.get().resources)); |
| } |
| |
| |
| // Checks that recovered resources are re-allocated correctly. |
| TEST_F(HierarchicalAllocatorTest, RecoverResources) |
| { |
| Clock::pause(); |
| |
| initialize(); |
| |
| SlaveInfo slave = createSlaveInfo( |
| "cpus(role1):1;mem(role1):200;" |
| "cpus:1;mem:200;disk:0"); |
| allocator->addSlave(slave.id(), slave, None(), slave.resources(), {}); |
| |
| // Initially, all the resources are allocated. |
| FrameworkInfo framework = createFrameworkInfo("role1"); |
| allocator->addFramework(framework.id(), framework, {}); |
| |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(1u, allocation.get().resources.size()); |
| EXPECT_TRUE(allocation.get().resources.contains(slave.id())); |
| EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Recover the reserved resources, expect them to be re-offered. |
| Resources reserved = Resources(slave.resources()).reserved("role1"); |
| |
| allocator->recoverResources( |
| allocation.get().frameworkId, |
| slave.id(), |
| reserved, |
| None()); |
| |
| Clock::advance(flags.allocation_interval); |
| |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(1u, allocation.get().resources.size()); |
| EXPECT_TRUE(allocation.get().resources.contains(slave.id())); |
| EXPECT_EQ(reserved, Resources::sum(allocation.get().resources)); |
| |
| // Recover the unreserved resources, expect them to be re-offered. |
| Resources unreserved = Resources(slave.resources()).unreserved(); |
| |
| allocator->recoverResources( |
| allocation.get().frameworkId, |
| slave.id(), |
| unreserved, |
| None()); |
| |
| Clock::advance(flags.allocation_interval); |
| |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(1u, allocation.get().resources.size()); |
| EXPECT_TRUE(allocation.get().resources.contains(slave.id())); |
| EXPECT_EQ(unreserved, Resources::sum(allocation.get().resources)); |
| } |
| |
| |
| TEST_F(HierarchicalAllocatorTest, Allocatable) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| initialize(); |
| |
| FrameworkInfo framework = createFrameworkInfo("role1"); |
| allocator->addFramework(framework.id(), framework, {}); |
| |
| // Not enough memory or cpu to be considered allocatable. |
| SlaveInfo slave1 = createSlaveInfo( |
| "cpus:" + stringify(MIN_CPUS / 2) + ";" |
| "mem:" + stringify((MIN_MEM / 2).megabytes()) + ";" |
| "disk:128"); |
| allocator->addSlave(slave1.id(), slave1, None(), slave1.resources(), {}); |
| |
| // Enough cpus to be considered allocatable. |
| SlaveInfo slave2 = createSlaveInfo( |
| "cpus:" + stringify(MIN_CPUS) + ";" |
| "mem:" + stringify((MIN_MEM / 2).megabytes()) + ";" |
| "disk:128"); |
| allocator->addSlave(slave2.id(), slave2, None(), slave2.resources(), {}); |
| |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(1u, allocation.get().resources.size()); |
| EXPECT_TRUE(allocation.get().resources.contains(slave2.id())); |
| EXPECT_EQ(slave2.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Enough memory to be considered allocatable. |
| SlaveInfo slave3 = createSlaveInfo( |
| "cpus:" + stringify(MIN_CPUS / 2) + ";" |
| "mem:" + stringify((MIN_MEM).megabytes()) + ";" |
| "disk:128"); |
| allocator->addSlave(slave3.id(), slave3, None(), slave3.resources(), {}); |
| |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(1u, allocation.get().resources.size()); |
| EXPECT_TRUE(allocation.get().resources.contains(slave3.id())); |
| EXPECT_EQ(slave3.resources(), Resources::sum(allocation.get().resources)); |
| |
| // slave4 has enough cpu and memory to be considered allocatable, |
| // but it lies across unreserved and reserved resources! |
| SlaveInfo slave4 = createSlaveInfo( |
| "cpus:" + stringify(MIN_CPUS / 1.5) + ";" |
| "mem:" + stringify((MIN_MEM / 2).megabytes()) + ";" |
| "cpus(role1):" + stringify(MIN_CPUS / 1.5) + ";" |
| "mem(role1):" + stringify((MIN_MEM / 2).megabytes()) + ";" |
| "disk:128"); |
| allocator->addSlave(slave4.id(), slave4, None(), slave4.resources(), {}); |
| |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(1u, allocation.get().resources.size()); |
| EXPECT_TRUE(allocation.get().resources.contains(slave4.id())); |
| EXPECT_EQ(slave4.resources(), Resources::sum(allocation.get().resources)); |
| } |
| |
| |
| // This test ensures that frameworks can apply offer operations (e.g., |
| // creating persistent volumes) on their allocations. |
| TEST_F(HierarchicalAllocatorTest, UpdateAllocation) |
| { |
| Clock::pause(); |
| |
| initialize(); |
| |
| SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100"); |
| allocator->addSlave(slave.id(), slave, None(), slave.resources(), {}); |
| |
| // Initially, all the resources are allocated. |
| FrameworkInfo framework = createFrameworkInfo("role1"); |
| allocator->addFramework(framework.id(), framework, {}); |
| |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(1u, allocation.get().resources.size()); |
| EXPECT_TRUE(allocation.get().resources.contains(slave.id())); |
| EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Construct an offer operation for the framework's allocation. |
| Resource volume = Resources::parse("disk", "5", "*").get(); |
| volume.mutable_disk()->mutable_persistence()->set_id("ID"); |
| volume.mutable_disk()->mutable_volume()->set_container_path("data"); |
| |
| Offer::Operation create; |
| create.set_type(Offer::Operation::CREATE); |
| create.mutable_create()->add_volumes()->CopyFrom(volume); |
| |
| // Ensure the offer operation can be applied. |
| Try<Resources> updated = |
| Resources::sum(allocation.get().resources).apply(create); |
| |
| ASSERT_SOME(updated); |
| |
| // Update the allocation in the allocator. |
| allocator->updateAllocation( |
| framework.id(), |
| slave.id(), |
| {create}); |
| |
| // Now recover the resources, and expect the next allocation to |
| // contain the updated resources. |
| allocator->recoverResources( |
| framework.id(), |
| slave.id(), |
| updated.get(), |
| None()); |
| |
| Clock::advance(flags.allocation_interval); |
| |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(1u, allocation.get().resources.size()); |
| EXPECT_TRUE(allocation.get().resources.contains(slave.id())); |
| |
| // The allocation should be the slave's resources with the offer |
| // operation applied. |
| updated = Resources(slave.resources()).apply(create); |
| ASSERT_SOME(updated); |
| |
| EXPECT_NE(Resources(slave.resources()), |
| Resources::sum(allocation.get().resources)); |
| |
| EXPECT_EQ(updated.get(), Resources::sum(allocation.get().resources)); |
| } |
| |
| |
| // This test ensures that a call to 'updateAvailable' succeeds when the |
| // allocator has sufficient available resources. |
| TEST_F(HierarchicalAllocatorTest, UpdateAvailableSuccess) |
| { |
| initialize(); |
| |
| SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100"); |
| allocator->addSlave(slave.id(), slave, None(), slave.resources(), {}); |
| |
| // Construct an offer operation for the framework's allocation. |
| Resources unreserved = Resources::parse("cpus:25;mem:50").get(); |
| Resources dynamicallyReserved = |
| unreserved.flatten("role1", createReservationInfo("ops")); |
| |
| Offer::Operation reserve = RESERVE(dynamicallyReserved); |
| |
| // Update the allocation in the allocator. |
| Future<Nothing> update = allocator->updateAvailable(slave.id(), {reserve}); |
| AWAIT_EXPECT_READY(update); |
| |
| // Expect to receive the updated available resources. |
| FrameworkInfo framework = createFrameworkInfo("role1"); |
| allocator->addFramework(framework.id(), framework, {}); |
| |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(1u, allocation.get().resources.size()); |
| EXPECT_TRUE(allocation.get().resources.contains(slave.id())); |
| |
| // The allocation should be the slave's resources with the offer |
| // operation applied. |
| Try<Resources> updated = Resources(slave.resources()).apply(reserve); |
| ASSERT_SOME(updated); |
| |
| EXPECT_NE(Resources(slave.resources()), |
| Resources::sum(allocation.get().resources)); |
| |
| EXPECT_EQ(updated.get(), Resources::sum(allocation.get().resources)); |
| } |
| |
| |
| // This test ensures that a call to 'updateAvailable' fails when the |
| // allocator has insufficient available resources. |
| TEST_F(HierarchicalAllocatorTest, UpdateAvailableFail) |
| { |
| initialize(); |
| |
| SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100"); |
| allocator->addSlave(slave.id(), slave, None(), slave.resources(), {}); |
| |
| // Expect to receive the all of the available resources. |
| FrameworkInfo framework = createFrameworkInfo("role1"); |
| allocator->addFramework(framework.id(), framework, {}); |
| |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(1u, allocation.get().resources.size()); |
| EXPECT_TRUE(allocation.get().resources.contains(slave.id())); |
| EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Construct an offer operation for the framework's allocation. |
| Resources unreserved = Resources::parse("cpus:25;mem:50").get(); |
| Resources dynamicallyReserved = |
| unreserved.flatten("role1", createReservationInfo("ops")); |
| |
| Offer::Operation reserve = RESERVE(dynamicallyReserved); |
| |
| // Update the allocation in the allocator. |
| Future<Nothing> update = allocator->updateAvailable(slave.id(), {reserve}); |
| AWAIT_EXPECT_FAILED(update); |
| } |
| |
| // This test ensures that when oversubscribed resources are updated |
| // subsequent allocations properly account for that. |
| TEST_F(HierarchicalAllocatorTest, UpdateSlave) |
| { |
| // Pause clock to disable batch allocation. |
| Clock::pause(); |
| |
| initialize(); |
| |
| SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100"); |
| allocator->addSlave(slave.id(), slave, None(), slave.resources(), {}); |
| |
| // Add a framework that can accept revocable resources. |
| FrameworkInfo framework = createFrameworkInfo( |
| "role1", |
| {FrameworkInfo::Capability::REVOCABLE_RESOURCES}); |
| allocator->addFramework(framework.id(), framework, {}); |
| |
| // Initially, all the resources are allocated. |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Update the slave with 10 oversubscribed cpus. |
| Resources oversubscribed = createRevocableResources("cpus", "10"); |
| allocator->updateSlave(slave.id(), oversubscribed); |
| |
| // The next allocation should be for 10 oversubscribed resources. |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(oversubscribed, Resources::sum(allocation.get().resources)); |
| |
| // Update the slave again with 12 oversubscribed cpus. |
| Resources oversubscribed2 = createRevocableResources("cpus", "12"); |
| allocator->updateSlave(slave.id(), oversubscribed2); |
| |
| // The next allocation should be for 2 oversubscribed cpus. |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(oversubscribed2 - oversubscribed, |
| Resources::sum(allocation.get().resources)); |
| |
| // Update the slave again with 5 oversubscribed cpus. |
| Resources oversubscribed3 = createRevocableResources("cpus", "5"); |
| allocator->updateSlave(slave.id(), oversubscribed3); |
| |
| // Since there are no more available oversubscribed resources there |
| // shouldn't be an allocation. |
| Clock::settle(); |
| allocation = allocations.get(); |
| ASSERT_TRUE(allocation.isPending()); |
| } |
| |
| |
| // This test verifies that a framework that has not opted in for |
| // revocable resources do not get allocated oversubscribed resources. |
| TEST_F(HierarchicalAllocatorTest, OversubscribedNotAllocated) |
| { |
| // Pause clock to disable batch allocation. |
| Clock::pause(); |
| |
| initialize(); |
| |
| SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100"); |
| allocator->addSlave(slave.id(), slave, None(), slave.resources(), {}); |
| |
| // Add a framework that does *not* accept revocable resources. |
| FrameworkInfo framework = createFrameworkInfo("role1"); |
| allocator->addFramework(framework.id(), framework, {}); |
| |
| // Initially, all the resources are allocated. |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Update the slave with 10 oversubscribed cpus. |
| Resources oversubscribed = createRevocableResources("cpus", "10"); |
| allocator->updateSlave(slave.id(), oversubscribed); |
| |
| // No allocation should be made for oversubscribed resources because |
| // the framework has not opted in for them. |
| Clock::settle(); |
| allocation = allocations.get(); |
| ASSERT_TRUE(allocation.isPending()); |
| } |
| |
| |
| // This test verifies that when oversubscribed resources are partially |
| // recovered subsequent allocation properly accounts for that. |
| TEST_F(HierarchicalAllocatorTest, RecoverOversubscribedResources) |
| { |
| // Pause clock to disable batch allocation. |
| Clock::pause(); |
| |
| initialize(); |
| |
| SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100"); |
| allocator->addSlave(slave.id(), slave, None(), slave.resources(), {}); |
| |
| // Add a framework that can accept revocable resources. |
| FrameworkInfo framework = createFrameworkInfo( |
| "role1", |
| {FrameworkInfo::Capability::REVOCABLE_RESOURCES}); |
| allocator->addFramework(framework.id(), framework, {}); |
| |
| // Initially, all the resources are allocated. |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Update the slave with 10 oversubscribed cpus. |
| Resources oversubscribed = createRevocableResources("cpus", "10"); |
| allocator->updateSlave(slave.id(), oversubscribed); |
| |
| // The next allocation should be for 10 oversubscribed cpus. |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(oversubscribed, Resources::sum(allocation.get().resources)); |
| |
| // Recover 6 oversubscribed cpus and 2 regular cpus. |
| Resources recovered = createRevocableResources("cpus", "6"); |
| recovered += Resources::parse("cpus:2").get(); |
| |
| allocator->recoverResources(framework.id(), slave.id(), recovered, None()); |
| |
| Clock::advance(flags.allocation_interval); |
| |
| // The next allocation should be for 6 oversubscribed and 2 regular |
| // cpus. |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(recovered, Resources::sum(allocation.get().resources)); |
| } |
| |
| |
| // Checks that a slave that is not whitelisted will not have its |
| // resources get offered, and that if the whitelist is updated so |
| // that it is whitelisted, its resources will then be offered. |
| TEST_F(HierarchicalAllocatorTest, Whitelist) |
| { |
| Clock::pause(); |
| |
| initialize(); |
| |
| hashset<string> whitelist; |
| whitelist.insert("dummy-agent"); |
| |
| allocator->updateWhitelist(whitelist); |
| |
| SlaveInfo slave = createSlaveInfo("cpus:2;mem:1024"); |
| allocator->addSlave(slave.id(), slave, None(), slave.resources(), {}); |
| |
| FrameworkInfo framework = createFrameworkInfo("*"); |
| allocator->addFramework(framework.id(), framework, {}); |
| |
| Future<Allocation> allocation = allocations.get(); |
| |
| // Ensure a batch allocation is triggered. |
| Clock::advance(flags.allocation_interval); |
| Clock::settle(); |
| |
| // There should be no allocation! |
| ASSERT_TRUE(allocation.isPending()); |
| |
| // Updating the whitelist to include the slave should |
| // trigger an allocation in the next batch. |
| whitelist.insert(slave.hostname()); |
| allocator->updateWhitelist(whitelist); |
| |
| Clock::advance(flags.allocation_interval); |
| |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(1u, allocation.get().resources.size()); |
| EXPECT_TRUE(allocation.get().resources.contains(slave.id())); |
| EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources)); |
| } |
| |
| |
| // The quota tests that are specific to the built-in Hierarchical DRF |
| // allocator (i.e. the way quota is satisfied) are in this file. |
| |
| // TODO(alexr): Additional tests we may want to implement: |
| // * A role has running tasks, quota is being set and is less than the |
| // current allocation, some tasks finish or are killed, but the role |
| // does not get new non-revocable offers (retroactively). |
| // * Multiple frameworks in a role with quota set, some agents fail, |
| // frameworks should be deprived fairly. |
| // * Multiple quota'ed roles, some agents fail, roles should be deprived |
| // according to their weights. |
| // * Oversubscribed resources should not count towards quota. |
| // * A role has dynamic reservations, quota is set and is less than total |
| // dynamic reservations. |
| // * A role has dynamic reservations, quota is set and is greater than |
| // total dynamic reservations. Resource math should account them towards |
| // quota and do not offer extra resources, offer dynamically reserved |
| // resources as part of quota and do not re-offer them afterwards. |
| |
| // In the presence of quota'ed and non-quota'ed roles, if a framework in |
| // the quota'ed role declines offers, some resources are laid away for |
| // the role, so that a greedy framework from a non-quota'ed role cannot |
| // eat up all free resources. |
| TEST_F(HierarchicalAllocatorTest, QuotaProvidesGuarantee) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| const string QUOTA_ROLE{"quota-role"}; |
| const string NO_QUOTA_ROLE{"no-quota-role"}; |
| |
| initialize(); |
| |
| // Create `framework1` and set quota for its role. |
| FrameworkInfo framework1 = createFrameworkInfo(QUOTA_ROLE); |
| allocator->addFramework(framework1.id(), framework1, {}); |
| |
| const Quota quota = createQuota(QUOTA_ROLE, "cpus:2;mem:1024"); |
| allocator->setQuota(QUOTA_ROLE, quota); |
| |
| // Create `framework2` in a non-quota'ed role. |
| FrameworkInfo framework2 = createFrameworkInfo(NO_QUOTA_ROLE); |
| allocator->addFramework(framework2.id(), framework2, {}); |
| |
| // Process all triggered allocation events. |
| // |
| // NOTE: No allocations happen because there are no resources to allocate. |
| Clock::settle(); |
| |
| SlaveInfo agent1 = createSlaveInfo("cpus:1;mem:512;disk:0"); |
| allocator->addSlave(agent1.id(), agent1, None(), agent1.resources(), {}); |
| |
| // `framework1` will be offered all of `agent1`'s resources because it is |
| // the only framework in the only role with unsatisfied quota. |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework1.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent1.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Total cluster resources: cpus=1, mem=512. |
| // QUOTA_ROLE share = 1 (cpus=1, mem=512) [quota: cpus=2, mem=1024] |
| // framework1 share = 1 |
| // NO_QUOTA_ROLE share = 0 |
| // framework2 share = 0 |
| |
| SlaveInfo agent2 = createSlaveInfo("cpus:1;mem:512;disk:0"); |
| allocator->addSlave(agent2.id(), agent2, None(), agent2.resources(), {}); |
| |
| // `framework1` will again be offered all of `agent2`'s resources |
| // because it is the only framework in the only role with unsatisfied |
| // quota. `framework2` has to wait. |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework1.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent2.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Total cluster resources: cpus=2, mem=1024. |
| // QUOTA_ROLE share = 1 (cpus=2, mem=1024) [quota: cpus=2, mem=1024] |
| // framework1 share = 1 |
| // NO_QUOTA_ROLE share = 0 |
| // framework2 share = 0 |
| |
| // Now `framework1` declines the second offer and sets a filter for twice |
| // the allocation interval. The declined resources should not be offered |
| // to `framework2` because by doing so they may not be available to |
| // `framework1` when the filter expires. |
| Duration filterTimeout = flags.allocation_interval * 2; |
| Filters offerFilter; |
| offerFilter.set_refuse_seconds(filterTimeout.secs()); |
| |
| allocator->recoverResources( |
| framework1.id(), |
| agent2.id(), |
| allocation.get().resources.get(agent2.id()).get(), |
| offerFilter); |
| |
| // Total cluster resources: cpus=2, mem=1024. |
| // QUOTA_ROLE share = 0.5 (cpus=1, mem=512) [quota: cpus=2, mem=1024] |
| // framework1 share = 1 |
| // NO_QUOTA_ROLE share = 0 |
| // framework2 share = 0 |
| |
| // Ensure the offer filter timeout is set before advancing the clock. |
| Clock::settle(); |
| |
| // Trigger a batch allocation. |
| Clock::advance(flags.allocation_interval); |
| Clock::settle(); |
| |
| // There should be no allocation due to the offer filter. |
| allocation = allocations.get(); |
| ASSERT_TRUE(allocation.isPending()); |
| |
| // Ensure the offer filter times out (2x the allocation interval) |
| // and the next batch allocation occurs. |
| Clock::advance(flags.allocation_interval); |
| |
| // Previously declined resources should be offered to the quota'ed role. |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework1.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent2.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Total cluster resources: cpus=2, mem=1024. |
| // QUOTA_ROLE share = 1 (cpus=2, mem=1024) [quota: cpus=2, mem=1024] |
| // framework1 share = 1 |
| // NO_QUOTA_ROLE share = 0 |
| // framework2 share = 0 |
| } |
| |
| |
| // If quota is removed, fair sharing should be restored in the cluster |
| // after sufficient number of tasks finish. |
| TEST_F(HierarchicalAllocatorTest, RemoveQuota) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| const string QUOTA_ROLE{"quota-role"}; |
| const string NO_QUOTA_ROLE{"no-quota-role"}; |
| |
| initialize(); |
| |
| const Quota quota = createQuota(QUOTA_ROLE, "cpus:2;mem:1024"); |
| allocator->setQuota(QUOTA_ROLE, quota); |
| |
| FrameworkInfo framework1 = createFrameworkInfo(QUOTA_ROLE); |
| allocator->addFramework(framework1.id(), framework1, {}); |
| |
| FrameworkInfo framework2 = createFrameworkInfo(NO_QUOTA_ROLE); |
| allocator->addFramework(framework2.id(), framework2, {}); |
| |
| SlaveInfo agent1 = createSlaveInfo("cpus:1;mem:512;disk:0"); |
| allocator->addSlave( |
| agent1.id(), |
| agent1, |
| None(), |
| agent1.resources(), |
| {std::make_pair(framework1.id(), agent1.resources())}); |
| |
| SlaveInfo agent2 = createSlaveInfo("cpus:1;mem:512;disk:0"); |
| allocator->addSlave( |
| agent2.id(), |
| agent2, |
| None(), |
| agent2.resources(), |
| {std::make_pair(framework1.id(), agent2.resources())}); |
| |
| // Total cluster resources (2 identical agents): cpus=2, mem=1024. |
| // QUOTA_ROLE share = 1 (cpus=2, mem=1024) [quota: cpus=2, mem=1024] |
| // framework1 share = 1 |
| // NO_QUOTA_ROLE share = 0 |
| // framework2 share = 0 |
| |
| // All cluster resources are now being used by `framework1` as part of |
| // its role quota, no further allocations are expected. However, once the |
| // quota is removed, quota guarantee does not apply any more and released |
| // resources should be offered to `framework2` to restore fairness. |
| |
| allocator->removeQuota(QUOTA_ROLE); |
| |
| // Process all triggered allocation events. |
| // |
| // NOTE: No allocations happen because there are no resources to allocate. |
| Clock::settle(); |
| |
| allocator->recoverResources( |
| framework1.id(), |
| agent1.id(), |
| agent1.resources(), |
| None()); |
| |
| // Trigger the next batch allocation. |
| Clock::advance(flags.allocation_interval); |
| |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework2.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent1.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Total cluster resources: cpus=2, mem=1024. |
| // QUOTA_ROLE share = 0.5 (cpus=1, mem=512) |
| // framework1 share = 1 |
| // NO_QUOTA_ROLE share = 0.5 (cpus=1, mem=512) |
| // framework2 share = 1 |
| |
| JSON::Object metrics = Metrics(); |
| |
| string metric = |
| "allocator/mesos/quota" |
| "/roles/" + QUOTA_ROLE + |
| "/resources/cpus" |
| "/offered_or_allocated"; |
| EXPECT_EQ(0u, metrics.values.count(metric)); |
| |
| metric = |
| "allocator/mesos/quota" |
| "/roles/" + QUOTA_ROLE + |
| "/resources/mem" |
| "/offered_or_allocated"; |
| EXPECT_EQ(0u, metrics.values.count(metric)); |
| } |
| |
| |
| // If a quota'ed role contains multiple frameworks, the resources should |
| // be distributed fairly between them. However, inside the quota'ed role, |
| // if one framework declines resources, there is no guarantee the other |
| // framework in the same role does not consume all role's quota. |
| TEST_F(HierarchicalAllocatorTest, MultipleFrameworksInRoleWithQuota) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| const string QUOTA_ROLE{"quota-role"}; |
| const string NO_QUOTA_ROLE{"no-quota-role"}; |
| |
| initialize(); |
| |
| // Create `framework1a` and set quota for its role. |
| FrameworkInfo framework1a = createFrameworkInfo(QUOTA_ROLE); |
| allocator->addFramework(framework1a.id(), framework1a, {}); |
| |
| const Quota quota = createQuota(QUOTA_ROLE, "cpus:4;mem:2048"); |
| allocator->setQuota(QUOTA_ROLE, quota); |
| |
| // Create `framework2` in a non-quota'ed role. |
| FrameworkInfo framework2 = createFrameworkInfo(NO_QUOTA_ROLE); |
| allocator->addFramework(framework2.id(), framework2, {}); |
| |
| // Process all triggered allocation events. |
| // |
| // NOTE: No allocations happen because there are no resources to allocate. |
| Clock::settle(); |
| |
| SlaveInfo agent1 = createSlaveInfo("cpus:1;mem:512;disk:0"); |
| allocator->addSlave(agent1.id(), agent1, None(), agent1.resources(), {}); |
| |
| // `framework1a` will be offered all of `agent1`'s resources because |
| // it is the only framework in the only role with unsatisfied quota. |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework1a.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent1.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Total cluster resources: cpus=1, mem=512. |
| // QUOTA_ROLE share = 1 (cpus=1, mem=512) [quota: cpus=2, mem=1024] |
| // framework1a share = 1 |
| // NO_QUOTA_ROLE share = 0 |
| // framework2 share = 0 |
| |
| // Create `framework1b` in the quota'ed role. |
| FrameworkInfo framework1b = createFrameworkInfo(QUOTA_ROLE); |
| allocator->addFramework(framework1b.id(), framework1b, {}); |
| |
| SlaveInfo agent2 = createSlaveInfo("cpus:2;mem:1024;disk:0"); |
| allocator->addSlave(agent2.id(), agent2, None(), agent2.resources(), {}); |
| |
| // `framework1b` will be offered all of `agent2`'s resources |
| // (coarse-grained allocation) because its share is 0 and it belongs |
| // to a role with unsatisfied quota. |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework1b.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent2.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Total cluster resources: cpus=3, mem=1536. |
| // QUOTA_ROLE share = 1 (cpus=3, mem=1536) [quota: cpus=4, mem=2048] |
| // framework1a share = 0.33 (cpus=1, mem=512) |
| // framework1b share = 0.66 (cpus=2, mem=1024) |
| // NO_QUOTA_ROLE share = 0 |
| // framework2 share = 0 |
| |
| SlaveInfo agent3 = createSlaveInfo("cpus:1;mem:512;disk:0"); |
| allocator->addSlave(agent3.id(), agent3, None(), agent3.resources(), {}); |
| |
| // `framework1a` will be offered all of `agent3`'s resources because |
| // its share is less than `framework1b`'s and `QUOTA_ROLE` still |
| // has unsatisfied quota. |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework1a.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent3.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Total cluster resources: cpus=4, mem=2048. |
| // QUOTA_ROLE share = 1 (cpus=4, mem=2048) [quota: cpus=4, mem=2048] |
| // framework1a share = 0.5 (cpus=2, mem=1024) |
| // framework1b share = 0.5 (cpus=2, mem=1024) |
| // NO_QUOTA_ROLE share = 0 |
| // framework2 share = 0 |
| |
| // If `framework1a` declines offered resources, they will be allocated to |
| // `framework1b`. |
| Filters filter5s; |
| filter5s.set_refuse_seconds(5.); |
| allocator->recoverResources( |
| framework1a.id(), |
| agent3.id(), |
| agent3.resources(), |
| filter5s); |
| |
| // Trigger the next batch allocation. |
| Clock::advance(flags.allocation_interval); |
| |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework1b.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent3.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Total cluster resources: cpus=4, mem=2048. |
| // QUOTA_ROLE share = 1 (cpus=4, mem=2048) [quota: cpus=4, mem=2048] |
| // framework1a share = 0.25 (cpus=1, mem=512) |
| // framework1b share = 0.75 (cpus=3, mem=1536) |
| // NO_QUOTA_ROLE share = 0 |
| // framework2 share = 0 |
| } |
| |
| |
| // The allocator performs coarse-grained allocations, and allocations |
| // to satisfy quota are no exception. A role may get more resources as |
| // part of its quota if the agent remaining resources are greater than |
| // the unsatisfied part of the role's quota. |
| TEST_F(HierarchicalAllocatorTest, QuotaAllocationGranularity) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| const string QUOTA_ROLE{"quota-role"}; |
| const string NO_QUOTA_ROLE{"no-quota-role"}; |
| |
| initialize(); |
| |
| // Create `framework1` and set quota for its role. |
| FrameworkInfo framework1 = createFrameworkInfo(QUOTA_ROLE); |
| allocator->addFramework(framework1.id(), framework1, {}); |
| |
| // Set quota to be less than the agent resources. |
| const Quota quota = createQuota(QUOTA_ROLE, "cpus:0.5;mem:200"); |
| allocator->setQuota(QUOTA_ROLE, quota); |
| |
| // Create `framework2` in a non-quota'ed role. |
| FrameworkInfo framework2 = createFrameworkInfo(NO_QUOTA_ROLE); |
| allocator->addFramework(framework2.id(), framework2, {}); |
| |
| // Process all triggered allocation events. |
| // |
| // NOTE: No allocations happen because there are no resources to allocate. |
| Clock::settle(); |
| |
| SlaveInfo agent = createSlaveInfo("cpus:1;mem:512;disk:0"); |
| allocator->addSlave(agent.id(), agent, None(), agent.resources(), {}); |
| |
| // `framework1` will be offered all of `agent`'s resources because |
| // it is the only framework in the only role with unsatisfied quota |
| // and the allocator performs coarse-grained allocation. |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework1.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent.resources(), Resources::sum(allocation.get().resources)); |
| EXPECT_TRUE(Resources(agent.resources()).contains(quota.info.guarantee())); |
| |
| // Total cluster resources: cpus=1, mem=512. |
| // QUOTA_ROLE share = 1 (cpus=1, mem=512) [quota: cpus=0.5, mem=200] |
| // framework1 share = 1 |
| // NO_QUOTA_ROLE share = 0 |
| // framework2 share = 0 |
| } |
| |
| |
| // This test verifies, that the free pool (what is left after all quotas |
| // are satisfied) is allocated according to the DRF algorithm across the roles |
| // which do not have quota set. |
| TEST_F(HierarchicalAllocatorTest, DRFWithQuota) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| const string QUOTA_ROLE{"quota-role"}; |
| const string NO_QUOTA_ROLE{"no-quota-role"}; |
| |
| initialize(); |
| |
| const Quota quota = createQuota(QUOTA_ROLE, "cpus:0.25;mem:128"); |
| allocator->setQuota(QUOTA_ROLE, quota); |
| |
| FrameworkInfo framework1 = createFrameworkInfo(QUOTA_ROLE); |
| allocator->addFramework(framework1.id(), framework1, {}); |
| |
| FrameworkInfo framework2 = createFrameworkInfo(NO_QUOTA_ROLE); |
| allocator->addFramework(framework2.id(), framework2, {}); |
| |
| // Process all triggered allocation events. |
| // |
| // NOTE: No allocations happen because there are no resources to allocate. |
| Clock::settle(); |
| |
| JSON::Object metrics = Metrics(); |
| |
| string metric = |
| "allocator/mesos/quota" |
| "/roles/" + QUOTA_ROLE + |
| "/resources/cpus" |
| "/guarantee"; |
| EXPECT_EQ(0.25, metrics.values[metric]); |
| |
| metric = |
| "allocator/mesos/quota" |
| "/roles/" + QUOTA_ROLE + |
| "/resources/mem" |
| "/guarantee"; |
| EXPECT_EQ(128, metrics.values[metric]); |
| |
| // Add an agent with some allocated resources. |
| SlaveInfo agent1 = createSlaveInfo("cpus:1;mem:512;disk:0"); |
| allocator->addSlave( |
| agent1.id(), |
| agent1, |
| None(), |
| agent1.resources(), |
| {std::make_pair(framework1.id(), Resources(quota.info.guarantee()))}); |
| |
| // Total cluster resources (1 agent): cpus=1, mem=512. |
| // QUOTA_ROLE share = 0.25 (cpus=0.25, mem=128) [quota: cpus=0.25, mem=128] |
| // framework1 share = 1 |
| // NO_QUOTA_ROLE share = 0 |
| // framework2 share = 0 |
| |
| // Some resources on `agent1` are now being used by `framework1` as part |
| // of its role quota. All quotas are satisfied, all available resources |
| // should be allocated according to fair shares of roles and frameworks. |
| |
| // `framework2` will be offered all of `agent1`'s resources because its |
| // share is 0. |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework2.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent1.resources() - Resources(quota.info.guarantee()), |
| Resources::sum(allocation.get().resources)); |
| |
| metrics = Metrics(); |
| |
| metric = |
| "allocator/mesos/quota" |
| "/roles/" + QUOTA_ROLE + |
| "/resources/cpus" |
| "/offered_or_allocated"; |
| EXPECT_EQ(0.25, metrics.values[metric]); |
| |
| metric = |
| "allocator/mesos/quota" |
| "/roles/" + QUOTA_ROLE + |
| "/resources/mem" |
| "/offered_or_allocated"; |
| EXPECT_EQ(128, metrics.values[metric]); |
| |
| metric = |
| "allocator/mesos/quota" |
| "/roles/" + QUOTA_ROLE + |
| "/resources/disk" |
| "/offered_or_allocated"; |
| EXPECT_EQ(0u, metrics.values.count(metric)); |
| |
| // Total cluster resources (1 agent): cpus=1, mem=512. |
| // QUOTA_ROLE share = 0.25 (cpus=0.25, mem=128) [quota: cpus=0.25, mem=128] |
| // framework1 share = 1 |
| // NO_QUOTA_ROLE share = 0.75 (cpus=0.75, mem=384) |
| // framework2 share = 1 |
| |
| SlaveInfo agent2 = createSlaveInfo("cpus:1;mem:512;disk:0"); |
| allocator->addSlave(agent2.id(), agent2, None(), agent2.resources(), {}); |
| |
| // `framework2` will be offered all of `agent2`'s resources (coarse-grained |
| // allocation). `framework1` does not receive them even though it has a |
| // smaller allocation, since we have already satisfied its role's quota. |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework2.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent2.resources(), Resources::sum(allocation.get().resources)); |
| } |
| |
| |
| // This tests addresses a so-called "starvation" case. Suppose there are |
| // several frameworks below their fair share: they decline any offers they |
| // get. There is also a framework which fully utilizes its share and would |
| // accept more resources if they were offered. However, if there are not |
| // many free resources available and the decline timeout is small enough, |
| // free resources may circulate between frameworks underutilizing their fair |
| // share and might never be offered to the framework that needs them. While |
| // this behavior corresponds to the way DRF algorithm works, it might not be |
| // desirable in some cases. Setting quota for a "starving" role can mitigate |
| // the issue. |
| TEST_F(HierarchicalAllocatorTest, QuotaAgainstStarvation) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| const string QUOTA_ROLE{"quota-role"}; |
| const string NO_QUOTA_ROLE{"no-quota-role"}; |
| |
| initialize(); |
| |
| FrameworkInfo framework1 = createFrameworkInfo(QUOTA_ROLE); |
| allocator->addFramework(framework1.id(), framework1, {}); |
| |
| FrameworkInfo framework2 = createFrameworkInfo(NO_QUOTA_ROLE); |
| allocator->addFramework(framework2.id(), framework2, {}); |
| |
| SlaveInfo agent1 = createSlaveInfo("cpus:1;mem:512;disk:0"); |
| allocator->addSlave( |
| agent1.id(), |
| agent1, |
| None(), |
| agent1.resources(), |
| {std::make_pair(framework1.id(), agent1.resources())}); |
| |
| // Process all triggered allocation events. |
| // |
| // NOTE: No allocations happen because all resources are already allocated. |
| Clock::settle(); |
| |
| // Total cluster resources (1 agent): cpus=1, mem=512. |
| // QUOTA_ROLE share = 1 (cpus=1, mem=512) |
| // framework1 share = 1 |
| // NO_QUOTA_ROLE share = 0 |
| // framework2 share = 0 |
| |
| SlaveInfo agent2 = createSlaveInfo("cpus:1;mem:512;disk:0"); |
| allocator->addSlave(agent2.id(), agent2, None(), agent2.resources(), {}); |
| |
| // Free cluster resources on `agent2` will be allocated to `framework2` |
| // because its share is 0. |
| |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework2.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent2.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Total cluster resources (2 identical agents): cpus=2, mem=1024. |
| // QUOTA_ROLE share = 0.5 (cpus=1, mem=512) |
| // framework1 share = 1 |
| // NO_QUOTA_ROLE share = 0.5 (cpus=1, mem=512) |
| // framework2 share = 1 |
| |
| // If `framework2` declines offered resources with 0 timeout, they will |
| // be returned to the free pool and then allocated to `framework2` again, |
| // because its share is still 0. |
| Filters filter0s; |
| filter0s.set_refuse_seconds(0.); |
| allocator->recoverResources( |
| framework2.id(), |
| agent2.id(), |
| agent2.resources(), |
| filter0s); |
| |
| // Total cluster resources (2 identical agents): cpus=2, mem=1024. |
| // QUOTA_ROLE share = 0.5 (cpus=1, mem=512) |
| // framework1 share = 1 |
| // NO_QUOTA_ROLE share = 0 |
| // framework2 share = 0 |
| |
| // Trigger the next batch allocation. |
| Clock::advance(flags.allocation_interval); |
| |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework2.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent2.resources(), Resources::sum(allocation.get().resources)); |
| |
| // `framework2` continues declining offers. |
| allocator->recoverResources( |
| framework2.id(), |
| agent2.id(), |
| agent2.resources(), |
| filter0s); |
| |
| // We set quota for the "starving" `QUOTA_ROLE` role. |
| const Quota quota = createQuota(QUOTA_ROLE, "cpus:2;mem:1024"); |
| allocator->setQuota(QUOTA_ROLE, quota); |
| |
| // Since `QUOTA_ROLE` is under quota, `agent2`'s resources will |
| // be allocated to `framework1`. |
| |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework1.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent2.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Total cluster resources: cpus=2, mem=1024. |
| // QUOTA_ROLE share = 1 (cpus=2, mem=1024) [quota: cpus=2, mem=1024] |
| // framework1 share = 1 |
| // NO_QUOTA_ROLE share = 0 |
| // framework2 share = 0 |
| } |
| |
| |
| // This test checks that quota is respected even for roles that do not |
| // have any frameworks currently registered. It also ensures an event- |
| // triggered allocation does not unnecessarily deprive non-quota'ed |
| // frameworks of resources. |
| TEST_F(HierarchicalAllocatorTest, QuotaAbsentFramework) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| const string QUOTA_ROLE{"quota-role"}; |
| const string NO_QUOTA_ROLE{"no-quota-role"}; |
| |
| initialize(); |
| |
| // Set quota for the quota'ed role. This role isn't registered with |
| // the allocator yet. |
| const Quota quota = createQuota(QUOTA_ROLE, "cpus:2;mem:1024"); |
| allocator->setQuota(QUOTA_ROLE, quota); |
| |
| // Add `framework` in the non-quota'ed role. |
| FrameworkInfo framework = createFrameworkInfo(NO_QUOTA_ROLE); |
| allocator->addFramework(framework.id(), framework, {}); |
| |
| // Process all triggered allocation events. |
| // |
| // NOTE: No allocations happen because there are no resources to allocate. |
| Clock::settle(); |
| |
| // Total cluster resources (0 agents): 0. |
| // QUOTA_ROLE share = 0 [quota: cpus=2, mem=1024] |
| // no frameworks |
| // NO_QUOTA_ROLE share = 0 |
| // framework share = 0 |
| |
| // Each `addSlave()` triggers an event-based allocation. |
| // |
| // NOTE: The second event-based allocation for `agent2` takes into account |
| // that `agent1`'s resources are laid away for `QUOTA_ROLE`'s quota and |
| // hence freely allocates for the non-quota'ed `NO_QUOTA_ROLE` role. |
| SlaveInfo agent1 = createSlaveInfo("cpus:2;mem:1024;disk:0"); |
| allocator->addSlave(agent1.id(), agent1, None(), agent1.resources(), {}); |
| |
| SlaveInfo agent2 = createSlaveInfo("cpus:1;mem:512;disk:0"); |
| allocator->addSlave(agent2.id(), agent2, None(), agent2.resources(), {}); |
| |
| // `framework` can only be allocated resources on `agent2`. This |
| // is due to the coarse-grained nature of the allocations. All the |
| // free resources on `agent1` would be considered to construct an |
| // offer, and that would exceed the resources allowed to be offered |
| // to the non-quota'ed role. |
| // |
| // NOTE: We would prefer to test that, without the presence of |
| // `agent2`, `framework` is not allocated anything. However, we |
| // can't easily test for the absence of an allocation from the |
| // framework side, so we make due with this instead. |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent2.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Total cluster resources (2 agents): cpus=3, mem=1536. |
| // QUOTA_ROLE share = 0 [quota: cpus=2, mem=1024], but |
| // (cpus=2, mem=1024) are laid away |
| // no frameworks |
| // NO_QUOTA_ROLE share = 0.33 |
| // framework share = 1 (cpus=1, mem=512) |
| } |
| |
| |
| // This test checks that if one role with quota has no frameworks in it, |
| // other roles with quota are still offered resources. Roles without |
| // frameworks have zero fair share and are always considered first during |
| // allocation, hence this test actually addresses several scenarios: |
| // * Quota'ed roles without frameworks do not prevent other quota'ed roles |
| // from getting resources. |
| // * Resources are not laid away for quota'ed roles without frameworks if |
| // there are other quota'ed roles with not fully satisfied quota. |
| TEST_F(HierarchicalAllocatorTest, MultiQuotaAbsentFrameworks) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| const string QUOTA_ROLE1{"quota-role-1"}; |
| const string QUOTA_ROLE2{"quota-role-2"}; |
| |
| initialize(); |
| |
| SlaveInfo agent = createSlaveInfo("cpus:2;mem:2048;disk:0"); |
| allocator->addSlave(agent.id(), agent, None(), agent.resources(), {}); |
| |
| // Set quota for both roles. |
| const Quota quota1 = createQuota(QUOTA_ROLE1, "cpus:1;mem:1024"); |
| allocator->setQuota(QUOTA_ROLE1, quota1); |
| |
| const Quota quota2 = createQuota(QUOTA_ROLE2, "cpus:2;mem:2048"); |
| allocator->setQuota(QUOTA_ROLE2, quota2); |
| |
| // Add a framework in the `QUOTA_ROLE2` role. |
| FrameworkInfo framework = createFrameworkInfo(QUOTA_ROLE2); |
| allocator->addFramework(framework.id(), framework, {}); |
| |
| // Due to the coarse-grained nature of the allocations, `framework` will |
| // get all `agent`'s resources. |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent.resources(), Resources::sum(allocation.get().resources)); |
| } |
| |
| |
| // This test checks that if there are multiple roles with quota, all of them |
| // get enough offers given there are enough resources. Suppose one quota'ed |
| // role has smaller share and is fully satisfied. Another quota'ed role has |
| // greater share but its quota is not fully satisfied yet. Though the first |
| // role is considered before the second because it has smaller share, this |
| // should not lead to starvation of the second role. |
| TEST_F(HierarchicalAllocatorTest, MultiQuotaWithFrameworks) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| const string QUOTA_ROLE1{"quota-role-1"}; |
| const string QUOTA_ROLE2{"quota-role-2"}; |
| |
| initialize(); |
| |
| // Mem Quota for `QUOTA_ROLE1` is 10 times smaller than for `QUOTA_ROLE2`. |
| const Quota quota1 = createQuota(QUOTA_ROLE1, "cpus:1;mem:200"); |
| allocator->setQuota(QUOTA_ROLE1, quota1); |
| |
| const Quota quota2 = createQuota(QUOTA_ROLE2, "cpus:2;mem:2000"); |
| allocator->setQuota(QUOTA_ROLE2, quota2); |
| |
| // Add `framework1` in the `QUOTA_ROLE1` role. |
| FrameworkInfo framework1 = createFrameworkInfo(QUOTA_ROLE1); |
| allocator->addFramework(framework1.id(), framework1, {}); |
| |
| // Add `framework2` in the `QUOTA_ROLE2` role. |
| FrameworkInfo framework2 = createFrameworkInfo(QUOTA_ROLE2); |
| allocator->addFramework(framework2.id(), framework2, {}); |
| |
| // Process all triggered allocation events. |
| // |
| // NOTE: No allocations happen because there are no resources to allocate. |
| Clock::settle(); |
| |
| SlaveInfo agent1 = createSlaveInfo("cpus:1;mem:1024;disk:0"); |
| allocator->addSlave( |
| agent1.id(), |
| agent1, |
| None(), |
| agent1.resources(), |
| {std::make_pair(framework1.id(), agent1.resources())}); |
| |
| SlaveInfo agent2 = createSlaveInfo("cpus:1;mem:1024;disk:0"); |
| allocator->addSlave( |
| agent2.id(), |
| agent2, |
| None(), |
| agent2.resources(), |
| {std::make_pair(framework2.id(), agent2.resources())}); |
| |
| // Total cluster resources (2 identical agents): cpus=2, mem=2048. |
| // QUOTA_ROLE1 share = 0.5 (cpus=1, mem=1024) [quota: cpus=1, mem=200] |
| // framework1 share = 1 |
| // QUOTA_ROLE2 share = 0.5 (cpus=1, mem=1024) [quota: cpus=2, mem=2000] |
| // framework2 share = 1 |
| |
| // Quota for the `QUOTA_ROLE1` role is satisfied, while `QUOTA_ROLE2` is |
| // under quota. Hence resources of the newly added agent should be offered |
| // to the framework in `QUOTA_ROLE2`. |
| |
| SlaveInfo agent3 = createSlaveInfo("cpus:2;mem:2048"); |
| allocator->addSlave(agent3.id(), agent3, None(), agent3.resources(), {}); |
| |
| // `framework2` will get all agent3's resources because its role is under |
| // quota, while other roles' quotas are satisfied. |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework2.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent3.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Total cluster resources (3 agents): cpus=4, mem=4096. |
| // QUOTA_ROLE1 share = 0.25 (cpus=1, mem=1024) [quota: cpus=1, mem=200] |
| // framework1 share = 1 |
| // QUOTA_ROLE2 share = 0.75 (cpus=3, mem=3072) [quota: cpus=2, mem=2000] |
| // framework2 share = 1 |
| } |
| |
| |
| // This tests that reserved resources are accounted for in the role's quota. |
| TEST_F(HierarchicalAllocatorTest, ReservationWithinQuota) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| const string QUOTA_ROLE{"quota-role"}; |
| const string NON_QUOTA_ROLE{"non-quota-role"}; |
| |
| initialize(); |
| |
| const Quota quota = createQuota(QUOTA_ROLE, "cpus:2;mem:256"); |
| allocator->setQuota(QUOTA_ROLE, quota); |
| |
| FrameworkInfo framework1 = createFrameworkInfo(QUOTA_ROLE); |
| allocator->addFramework(framework1.id(), framework1, {}); |
| |
| FrameworkInfo framework2 = createFrameworkInfo(NON_QUOTA_ROLE); |
| allocator->addFramework(framework2.id(), framework2, {}); |
| |
| // Process all triggered allocation events. |
| // |
| // NOTE: No allocations happen because there are no resources to allocate. |
| Clock::settle(); |
| |
| // Some resources on `agent1` are now being used by `framework1` as part |
| // of its role quota. `framework2` will be offered the rest of `agent1`'s |
| // resources since `framework1`'s quota is satisfied, and `framework2` has |
| // no resources. |
| SlaveInfo agent1 = createSlaveInfo("cpus:8;mem(" + QUOTA_ROLE + "):256"); |
| allocator->addSlave( |
| agent1.id(), |
| agent1, |
| None(), |
| agent1.resources(), |
| {std::make_pair( |
| framework1.id(), |
| // The `mem` portion is used to test that reserved resources are |
| // accounted for, and the `cpus` portion is allocated to show that |
| // the result of DRF would be different if `mem` was not accounted. |
| Resources::parse("cpus:2;mem(" + QUOTA_ROLE + "):256").get())}); |
| |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework2.id(), allocation.get().frameworkId); |
| |
| EXPECT_EQ(Resources::parse("cpus:6").get(), |
| Resources::sum(allocation.get().resources)); |
| |
| // Since the reserved resources account towards the quota as well as being |
| // accounted for DRF, we expect these resources to also be allocated to |
| // `framework2`. |
| SlaveInfo agent2 = createSlaveInfo("cpus:4"); |
| allocator->addSlave(agent2.id(), agent2, None(), agent2.resources(), {}); |
| |
| allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework2.id(), allocation.get().frameworkId); |
| |
| EXPECT_EQ(Resources::parse("cpus:4").get(), |
| Resources::sum(allocation.get().resources)); |
| } |
| |
| |
| // This test checks that when setting aside unallocated resources to |
| // ensure that a quota guarantee can be met, we don't use resources |
| // that have been reserved for a different role. |
| // |
| // We setup a scenario with 8 CPUs, where role X has quota for 4 CPUs |
| // and role Y has 4 CPUs reserved. All offers are declined; the 4 |
| // unreserved CPUs should not be offered to role Y. |
| TEST_F(HierarchicalAllocatorTest, QuotaSetAsideReservedResources) |
| { |
| Clock::pause(); |
| |
| const string QUOTA_ROLE{"quota-role"}; |
| const string NO_QUOTA_ROLE{"no-quota-role"}; |
| |
| initialize(); |
| |
| // Create two agents. |
| SlaveInfo agent1 = createSlaveInfo("cpus:4;mem:512;disk:0"); |
| allocator->addSlave(agent1.id(), agent1, None(), agent1.resources(), {}); |
| |
| SlaveInfo agent2 = createSlaveInfo("cpus:4;mem:512;disk:0"); |
| allocator->addSlave(agent2.id(), agent2, None(), agent2.resources(), {}); |
| |
| // Reserve 4 CPUs and 512MB of memory on `agent2` for non-quota'ed role. |
| Resources unreserved = Resources::parse("cpus:4;mem:512").get(); |
| Resources dynamicallyReserved = |
| unreserved.flatten(NO_QUOTA_ROLE, createReservationInfo("ops")); |
| |
| Offer::Operation reserve = RESERVE(dynamicallyReserved); |
| |
| Future<Nothing> updateAgent2 = |
| allocator->updateAvailable(agent2.id(), {reserve}); |
| |
| AWAIT_EXPECT_READY(updateAgent2); |
| |
| // Create `framework1` and set quota for its role. |
| FrameworkInfo framework1 = createFrameworkInfo(QUOTA_ROLE); |
| allocator->addFramework(framework1.id(), framework1, {}); |
| |
| const Quota quota = createQuota(QUOTA_ROLE, "cpus:4"); |
| allocator->setQuota(QUOTA_ROLE, quota); |
| |
| // `framework1` will be offered resources at `agent1` because the |
| // resources at `agent2` are reserved for a different role. |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework1.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent1.resources(), Resources::sum(allocation.get().resources)); |
| |
| // `framework1` declines the resources on `agent1` for the duration |
| // of the test. |
| Filters longFilter; |
| longFilter.set_refuse_seconds(flags.allocation_interval.secs() * 10); |
| |
| allocator->recoverResources( |
| framework1.id(), |
| agent1.id(), |
| agent1.resources(), |
| longFilter); |
| |
| // Trigger a batch allocation for good measure, but don't expect any |
| // allocations. |
| Clock::advance(flags.allocation_interval); |
| Clock::settle(); |
| |
| allocation = allocations.get(); |
| EXPECT_TRUE(allocation.isPending()); |
| |
| // Create `framework2` in a non-quota'ed role. |
| FrameworkInfo framework2 = createFrameworkInfo(NO_QUOTA_ROLE); |
| allocator->addFramework(framework2.id(), framework2, {}); |
| |
| // `framework2` will be offered the reserved resources at `agent2` |
| // because those resources are reserved for its role. |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework2.id(), allocation.get().frameworkId); |
| EXPECT_EQ(dynamicallyReserved, Resources::sum(allocation.get().resources)); |
| |
| // `framework2` declines the resources on `agent2` for the duration |
| // of the test. |
| allocator->recoverResources( |
| framework2.id(), |
| agent2.id(), |
| dynamicallyReserved, |
| longFilter); |
| |
| // No more resource offers should be made until the filters expire: |
| // `framework1` should not be offered the resources at `agent2` |
| // (because they are reserved for a different role), and |
| // `framework2` should not be offered the resources at `agent1` |
| // (because this would risk violating quota guarantees). |
| |
| // Trigger a batch allocation for good measure, but don't expect any |
| // allocations. |
| Clock::advance(flags.allocation_interval); |
| Clock::settle(); |
| |
| allocation = allocations.get(); |
| EXPECT_TRUE(allocation.isPending()); |
| } |
| |
| |
| // This test checks that if a framework suppresses offers, disconnects and |
| // reconnects again, it will start receiving resource offers again. |
| TEST_F(HierarchicalAllocatorTest, DeactivateAndReactivateFramework) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| initialize(); |
| |
| // Total cluster resources will become cpus=2, mem=1024. |
| SlaveInfo agent = createSlaveInfo("cpus:2;mem:1024;disk:0"); |
| allocator->addSlave(agent.id(), agent, None(), agent.resources(), {}); |
| |
| // Framework will be offered all of the agent's resources since it is |
| // the only framework running so far. |
| FrameworkInfo framework = createFrameworkInfo("role1"); |
| allocator->addFramework(framework.id(), framework, {}); |
| |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent.resources(), Resources::sum(allocation.get().resources)); |
| |
| allocator->recoverResources( |
| framework.id(), |
| agent.id(), |
| agent.resources(), |
| None()); |
| |
| // Suppress offers and disconnect framework. |
| allocator->suppressOffers(framework.id()); |
| allocator->deactivateFramework(framework.id()); |
| |
| // Advance the clock and trigger a background allocation cycle. |
| Clock::advance(flags.allocation_interval); |
| |
| // Wait for all the `suppressOffers` and `deactivateFramework` |
| // operations to be processed. |
| Clock::settle(); |
| |
| allocation = allocations.get(); |
| EXPECT_TRUE(allocation.isPending()); |
| |
| // Reconnect the framework again. |
| allocator->activateFramework(framework.id()); |
| |
| // Framework will be offered all of agent's resources again |
| // after getting activated. |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent.resources(), Resources::sum(allocation.get().resources)); |
| } |
| |
| |
| // This test verifies that offer suppression and revival work as intended. |
| TEST_F(HierarchicalAllocatorTest, SuppressAndReviveOffers) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| initialize(); |
| |
| // Total cluster resources will become cpus=2, mem=1024. |
| SlaveInfo agent = createSlaveInfo("cpus:2;mem:1024;disk:0"); |
| allocator->addSlave(agent.id(), agent, None(), agent.resources(), {}); |
| |
| // Framework will be offered all of the agent's resources since it is |
| // the only framework running so far. |
| FrameworkInfo framework = createFrameworkInfo("role1"); |
| allocator->addFramework(framework.id(), framework, {}); |
| |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent.resources(), Resources::sum(allocation.get().resources)); |
| |
| // Here the revival is totally unnecessary but we should tolerate the |
| // framework's redundant REVIVE calls. |
| allocator->reviveOffers(framework.id()); |
| |
| // Nothing is allocated because of no additional resources. |
| allocation = allocations.get(); |
| EXPECT_TRUE(allocation.isPending()); |
| |
| allocator->recoverResources( |
| framework.id(), |
| agent.id(), |
| agent.resources(), |
| None()); |
| |
| allocator->suppressOffers(framework.id()); |
| |
| // Advance the clock and trigger a background allocation cycle. |
| Clock::advance(flags.allocation_interval); |
| Clock::settle(); |
| |
| // Still pending because the framework has suppressed offers. |
| EXPECT_TRUE(allocation.isPending()); |
| |
| // Revive again and this time it should work. |
| allocator->reviveOffers(framework.id()); |
| |
| // Framework will be offered all of agent's resources again after |
| // reviving offers. |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent.resources(), Resources::sum(allocation.get().resources)); |
| } |
| |
| |
| // This test checks that total and allocator resources |
| // are correctly reflected in the metrics endpoint. |
| TEST_F(HierarchicalAllocatorTest, ResourceMetrics) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| initialize(); |
| |
| SlaveInfo agent = createSlaveInfo("cpus:2;mem:1024;disk:0"); |
| allocator->addSlave(agent.id(), agent, None(), agent.resources(), {}); |
| Clock::settle(); |
| |
| JSON::Object expected; |
| |
| // No frameworks are registered yet, so nothing is allocated. |
| expected.values = { |
| {"allocator/mesos/resources/cpus/total", 2}, |
| {"allocator/mesos/resources/mem/total", 1024}, |
| {"allocator/mesos/resources/disk/total", 0}, |
| {"allocator/mesos/resources/cpus/offered_or_allocated", 0}, |
| {"allocator/mesos/resources/mem/offered_or_allocated", 0}, |
| {"allocator/mesos/resources/disk/offered_or_allocated", 0}, |
| }; |
| |
| JSON::Value metrics = Metrics(); |
| |
| EXPECT_TRUE(metrics.contains(expected)); |
| |
| FrameworkInfo framework = createFrameworkInfo("role1"); |
| allocator->addFramework(framework.id(), framework, {}); |
| Clock::settle(); |
| |
| // All of the resources should be offered. |
| expected.values = { |
| {"allocator/mesos/resources/cpus/total", 2}, |
| {"allocator/mesos/resources/mem/total", 1024}, |
| {"allocator/mesos/resources/disk/total", 0}, |
| {"allocator/mesos/resources/cpus/offered_or_allocated", 2}, |
| {"allocator/mesos/resources/mem/offered_or_allocated", 1024}, |
| {"allocator/mesos/resources/disk/offered_or_allocated", 0}, |
| }; |
| |
| metrics = Metrics(); |
| |
| EXPECT_TRUE(metrics.contains(expected)); |
| |
| allocator->removeSlave(agent.id()); |
| Clock::settle(); |
| |
| // No frameworks are registered yet, so nothing is allocated. |
| expected.values = { |
| {"allocator/mesos/resources/cpus/total", 0}, |
| {"allocator/mesos/resources/mem/total", 0}, |
| {"allocator/mesos/resources/disk/total", 0}, |
| {"allocator/mesos/resources/cpus/offered_or_allocated", 0}, |
| {"allocator/mesos/resources/mem/offered_or_allocated", 0}, |
| {"allocator/mesos/resources/disk/offered_or_allocated", 0}, |
| }; |
| |
| metrics = Metrics(); |
| |
| EXPECT_TRUE(metrics.contains(expected)); |
| } |
| |
| |
| // This test checks that the number of times the allocation |
| // algorithm has run is correctly reflected in the metric. |
| TEST_F(HierarchicalAllocatorTest, AllocationRunsMetric) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| initialize(); |
| |
| size_t allocations = 0; |
| |
| JSON::Object expected; |
| |
| expected.values = { {"allocator/mesos/allocation_runs", allocations} }; |
| |
| JSON::Value metrics = Metrics(); |
| |
| EXPECT_TRUE(metrics.contains(expected)); |
| |
| SlaveInfo agent = createSlaveInfo("cpus:2;mem:1024;disk:0"); |
| allocator->addSlave(agent.id(), agent, None(), agent.resources(), {}); |
| ++allocations; // Adding an agent triggers allocations. |
| |
| FrameworkInfo framework = createFrameworkInfo("role"); |
| allocator->addFramework(framework.id(), framework, {}); |
| ++allocations; // Adding a framework triggers allocations. |
| |
| Clock::settle(); |
| |
| expected.values = { {"allocator/mesos/allocation_runs", allocations} }; |
| |
| metrics = Metrics(); |
| |
| EXPECT_TRUE(metrics.contains(expected)); |
| } |
| |
| |
| // This test checks that the allocation run timer |
| // metrics are reported in the metrics endpoint. |
| TEST_F(HierarchicalAllocatorTest, AllocationRunTimerMetrics) |
| { |
| Clock::pause(); |
| |
| initialize(); |
| |
| // These time series statistics will be generated |
| // once at least 2 allocation runs occur. |
| auto statistics = { |
| "allocator/mesos/allocation_run_ms/count", |
| "allocator/mesos/allocation_run_ms/min", |
| "allocator/mesos/allocation_run_ms/max", |
| "allocator/mesos/allocation_run_ms/p50", |
| "allocator/mesos/allocation_run_ms/p95", |
| "allocator/mesos/allocation_run_ms/p99", |
| "allocator/mesos/allocation_run_ms/p999", |
| "allocator/mesos/allocation_run_ms/p9999", |
| }; |
| |
| JSON::Object metrics = Metrics(); |
| map<string, JSON::Value> values = metrics.values; |
| |
| EXPECT_EQ(0u, values.count("allocator/mesos/allocation_run_ms")); |
| |
| // No allocation timing statistics should appear. |
| foreach (const string& statistic, statistics) { |
| EXPECT_EQ(0u, values.count(statistic)) |
| << "Expected " << statistic << " to be absent"; |
| } |
| |
| // Allow the allocation timer to measure time. |
| Clock::resume(); |
| |
| // Trigger at least two calls to allocate occur |
| // to generate the window statistics. |
| SlaveInfo agent = createSlaveInfo("cpus:2;mem:1024;disk:0"); |
| allocator->addSlave(agent.id(), agent, None(), agent.resources(), {}); |
| |
| FrameworkInfo framework = createFrameworkInfo("role1"); |
| allocator->addFramework(framework.id(), framework, {}); |
| |
| // Wait for the allocation to complete. |
| AWAIT_READY(allocations.get()); |
| |
| // Ensure the timer has been stopped so that |
| // the second measurement to be recorded. |
| Clock::pause(); |
| Clock::settle(); |
| Clock::resume(); |
| |
| metrics = Metrics(); |
| values = metrics.values; |
| |
| // A non-zero measurement should be present. |
| EXPECT_EQ(1u, values.count("allocator/mesos/allocation_run_ms")); |
| |
| JSON::Value value = metrics.values["allocator/mesos/allocation_run_ms"]; |
| ASSERT_TRUE(value.is<JSON::Number>()) << value.which(); |
| |
| JSON::Number timing = value.as<JSON::Number>(); |
| ASSERT_EQ(JSON::Number::FLOATING, timing.type); |
| EXPECT_GT(timing.as<double>(), 0.0); |
| |
| // The statistics should be generated. |
| foreach (const string& statistic, statistics) { |
| EXPECT_EQ(1u, values.count(statistic)) |
| << "Expected " << statistic << " to be present"; |
| } |
| } |
| |
| |
| // This test checks that per-role active offer filter metrics |
| // are correctly reported in the metrics endpoint. |
| TEST_F(HierarchicalAllocatorTest, ActiveOfferFiltersMetrics) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| initialize(); |
| |
| SlaveInfo agent = createSlaveInfo("cpus:2;mem:1024;disk:0"); |
| allocator->addSlave(agent.id(), agent, None(), agent.resources(), {}); |
| |
| // Register three frameworks, two of which are in the same role. |
| // For every offer the frameworks install practically indefinite |
| // offer filters. |
| Duration filterTimeout = flags.allocation_interval * 100; |
| Filters offerFilter; |
| offerFilter.set_refuse_seconds(filterTimeout.secs()); |
| |
| FrameworkInfo framework1 = createFrameworkInfo("roleA"); |
| allocator->addFramework(framework1.id(), framework1, {}); |
| |
| Future<Allocation> allocation = allocations.get(); |
| |
| AWAIT_READY(allocation); |
| ASSERT_EQ(framework1.id(), allocation->frameworkId); |
| |
| allocator->recoverResources( |
| allocation->frameworkId, |
| agent.id(), |
| allocation->resources.get(agent.id()).get(), |
| offerFilter); |
| |
| JSON::Object expected; |
| expected.values = { |
| {"allocator/mesos/offer_filters/roles/roleA/active", 1}, |
| }; |
| |
| JSON::Value metrics = Metrics(); |
| |
| EXPECT_TRUE(metrics.contains(expected)); |
| |
| FrameworkInfo framework2 = createFrameworkInfo("roleB"); |
| allocator->addFramework(framework2.id(), framework2, {}); |
| |
| allocation = allocations.get(); |
| |
| AWAIT_READY(allocation); |
| ASSERT_EQ(framework2.id(), allocation->frameworkId); |
| |
| allocator->recoverResources( |
| allocation->frameworkId, |
| agent.id(), |
| allocation->resources.get(agent.id()).get(), |
| offerFilter); |
| |
| expected.values = { |
| {"allocator/mesos/offer_filters/roles/roleA/active", 1}, |
| {"allocator/mesos/offer_filters/roles/roleB/active", 1}, |
| }; |
| |
| metrics = Metrics(); |
| |
| EXPECT_TRUE(metrics.contains(expected)); |
| |
| FrameworkInfo framework3 = createFrameworkInfo("roleA"); |
| allocator->addFramework(framework3.id(), framework3, {}); |
| |
| allocation = allocations.get(); |
| |
| AWAIT_READY(allocation); |
| ASSERT_EQ(framework3.id(), allocation->frameworkId); |
| |
| allocator->recoverResources( |
| allocation->frameworkId, |
| agent.id(), |
| allocation->resources.get(agent.id()).get(), |
| offerFilter); |
| |
| expected.values = { |
| {"allocator/mesos/offer_filters/roles/roleA/active", 2}, |
| {"allocator/mesos/offer_filters/roles/roleB/active", 1}, |
| }; |
| |
| metrics = Metrics(); |
| |
| EXPECT_TRUE(metrics.contains(expected)); |
| } |
| |
| |
| // Verifies that per-role dominant share metrics are correctly reported. |
| TEST_F(HierarchicalAllocatorTest, DominantShareMetrics) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| initialize(); |
| |
| // Register one agent and one framework. The framework will |
| // immediately receive receive an offer and make it have the |
| // maximum possible dominant share. |
| SlaveInfo agent1 = createSlaveInfo("cpus:1;mem:1024"); |
| allocator->addSlave(agent1.id(), agent1, None(), agent1.resources(), {}); |
| |
| FrameworkInfo framework1 = createFrameworkInfo("roleA"); |
| allocator->addFramework(framework1.id(), framework1, {}); |
| Clock::settle(); |
| |
| JSON::Object expected; |
| |
| expected.values = { |
| {"allocator/mesos/roles/roleA/shares/dominant", 1}, |
| }; |
| |
| JSON::Value metrics = Metrics(); |
| EXPECT_TRUE(metrics.contains(expected)); |
| |
| // Decline the offered resources and expect a zero share. |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| allocator->recoverResources( |
| allocation->frameworkId, |
| agent1.id(), |
| allocation->resources.get(agent1.id()).get(), |
| None()); |
| Clock::settle(); |
| |
| expected.values = { |
| {"allocator/mesos/roles/roleA/shares/dominant", 0}, |
| }; |
| |
| metrics = Metrics(); |
| EXPECT_TRUE(metrics.contains(expected)); |
| |
| // Register a second framework. This framework will receive |
| // offers as `framework1` has just declined an offer and the |
| // implicit filter has not yet timed out. The new framework |
| // will have the full share. |
| FrameworkInfo framework2 = createFrameworkInfo("roleB"); |
| allocator->addFramework(framework2.id(), framework2, {}); |
| Clock::settle(); |
| |
| expected.values = { |
| {"allocator/mesos/roles/roleA/shares/dominant", 0}, |
| {"allocator/mesos/roles/roleB/shares/dominant", 1}, |
| }; |
| |
| metrics = Metrics(); |
| EXPECT_TRUE(metrics.contains(expected)); |
| |
| // Add a second, identical agent. Now `framework1` will |
| // receive an offer since it has the lowest dominant |
| // share. After the offer the dominant shares of |
| // `framework1` and `framework2` are equal. |
| SlaveInfo agent2 = createSlaveInfo("cpus:1;mem:1024"); |
| allocator->addSlave(agent2.id(), agent2, None(), agent2.resources(), {}); |
| Clock::settle(); |
| |
| expected.values = { |
| {"allocator/mesos/roles/roleA/shares/dominant", 0.5}, |
| {"allocator/mesos/roles/roleB/shares/dominant", 0.5}, |
| }; |
| |
| metrics = Metrics(); |
| EXPECT_TRUE(metrics.contains(expected)); |
| |
| // Removing `framework2` frees up its allocated resources. The |
| // corresponding metric is removed when the last framework in |
| // the role is removed. |
| allocator->removeFramework(framework2.id()); |
| Clock::settle(); |
| |
| expected.values = { |
| {"allocator/mesos/roles/roleA/shares/dominant", 0.5}, |
| }; |
| |
| metrics = Metrics(); |
| EXPECT_TRUE(metrics.contains(expected)); |
| |
| ASSERT_TRUE(metrics.is<JSON::Object>()); |
| map<string, JSON::Value> values = metrics.as<JSON::Object>().values; |
| EXPECT_EQ(0u, values.count("allocator/mesos/roles/roleB/shares/dominant")); |
| } |
| |
| |
| // Verifies that per-role dominant share metrics are correctly |
| // reported when resources are excluded from fair sharing. |
| TEST_F(HierarchicalAllocatorTest, DominantShareMetricsWithFairnessExclusion) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| // Specify that `gpus` should not be fairly shared. |
| master::Flags flags_; |
| flags_.fair_sharing_excluded_resource_names = set<string>({"gpus"}); |
| |
| initialize(flags_); |
| |
| // Register one agent and one framework. The framework will |
| // immediately receive receive an offer and make it have the |
| // maximum possible dominant share. |
| SlaveInfo agent1 = createSlaveInfo("cpus:1;mem:1024;gpus:1"); |
| allocator->addSlave(agent1.id(), agent1, None(), agent1.resources(), {}); |
| |
| FrameworkInfo framework1 = createFrameworkInfo( |
| "roleA", {FrameworkInfo::Capability::GPU_RESOURCES}); |
| |
| allocator->addFramework(framework1.id(), framework1, {}); |
| Clock::settle(); |
| |
| JSON::Object expected; |
| |
| expected.values = { |
| {"allocator/mesos/roles/roleA/shares/dominant", 1}, |
| }; |
| |
| JSON::Value metrics = Metrics(); |
| EXPECT_TRUE(metrics.contains(expected)); |
| |
| FrameworkInfo framework2 = createFrameworkInfo("roleB"); |
| allocator->addFramework(framework2.id(), framework2, {}); |
| Clock::settle(); |
| |
| // Add a second, identical agent. Now `framework2` will |
| // receive an offer since it has the lowest dominant share: |
| // the 100% of `gpus` allocated to framework1 are excluded! |
| SlaveInfo agent2 = createSlaveInfo("cpus:3;mem:3072"); |
| allocator->addSlave(agent2.id(), agent2, None(), agent2.resources(), {}); |
| Clock::settle(); |
| |
| expected.values = { |
| {"allocator/mesos/roles/roleA/shares/dominant", 0.25}, |
| {"allocator/mesos/roles/roleB/shares/dominant", 0.75}, |
| }; |
| |
| metrics = Metrics(); |
| EXPECT_TRUE(metrics.contains(expected)); |
| } |
| |
| |
| // This test ensures that resource allocation is done according to each role's |
| // weight. This is done by having six agents and three frameworks and making |
| // sure each framework gets the appropriate number of resources. |
| TEST_F(HierarchicalAllocatorTest, UpdateWeight) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| initialize(); |
| |
| // Define some constants to make the code read easily. |
| const string SINGLE_RESOURCE = "cpus:2;mem:1024"; |
| const string DOUBLE_RESOURCES = "cpus:4;mem:2048"; |
| const string TRIPLE_RESOURCES = "cpus:6;mem:3072"; |
| const string FOURFOLD_RESOURCES = "cpus:8;mem:4096"; |
| const string TOTAL_RESOURCES = "cpus:12;mem:6144"; |
| |
| auto awaitAllocationsAndRecoverResources = [this]( |
| Resources& totalAllocatedResources, |
| hashmap<FrameworkID, Allocation>& frameworkAllocations, |
| int allocationsCount, |
| bool recoverResources) { |
| for (int i = 0; i < allocationsCount; i++) { |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| |
| frameworkAllocations[allocation.get().frameworkId] = allocation.get(); |
| totalAllocatedResources += Resources::sum(allocation.get().resources); |
| |
| if (recoverResources) { |
| // Recover the allocated resources so they can be offered |
| // again next time. |
| foreachpair (const SlaveID& slaveId, |
| const Resources& resources, |
| allocation.get().resources) { |
| allocator->recoverResources( |
| allocation.get().frameworkId, |
| slaveId, |
| resources, |
| None()); |
| } |
| } |
| } |
| }; |
| |
| // Register six agents with the same resources (cpus:2;mem:1024). |
| vector<SlaveInfo> agents; |
| for (size_t i = 0; i < 6; i++) { |
| SlaveInfo agent = createSlaveInfo(SINGLE_RESOURCE); |
| agents.push_back(agent); |
| allocator->addSlave(agent.id(), agent, None(), agent.resources(), {}); |
| } |
| |
| // Total cluster resources (6 agents): cpus=12, mem=6144. |
| |
| // Framework1 registers with 'role1' which uses the default weight (1.0), |
| // and all resources will be offered to this framework since it is the only |
| // framework running so far. |
| FrameworkInfo framework1 = createFrameworkInfo("role1"); |
| allocator->addFramework(framework1.id(), framework1, {}); |
| |
| // Framework2 registers with 'role2' which also uses the default weight. |
| // It will not get any offers due to all resources having outstanding offers |
| // to framework1 when it registered. |
| FrameworkInfo framework2 = createFrameworkInfo("role2"); |
| allocator->addFramework(framework2.id(), framework2, {}); |
| |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| |
| // role1 share = 1 (cpus=12, mem=6144) |
| // framework1 share = 1 |
| // role2 share = 0 |
| // framework2 share = 0 |
| |
| ASSERT_EQ(allocation.get().frameworkId, framework1.id()); |
| ASSERT_EQ(6u, allocation.get().resources.size()); |
| EXPECT_EQ(Resources::parse(TOTAL_RESOURCES).get(), |
| Resources::sum(allocation.get().resources)); |
| |
| // Recover all resources so they can be offered again next time. |
| foreachpair (const SlaveID& slaveId, |
| const Resources& resources, |
| allocation.get().resources) { |
| allocator->recoverResources( |
| allocation.get().frameworkId, |
| slaveId, |
| resources, |
| None()); |
| } |
| |
| // Tests whether `framework1` and `framework2` each get half of the resources |
| // when their roles' weights are 1:1. |
| { |
| // Advance the clock and trigger a batch allocation. |
| Clock::advance(flags.allocation_interval); |
| |
| // role1 share = 0.5 (cpus=6, mem=3072) |
| // framework1 share = 1 |
| // role2 share = 0.5 (cpus=6, mem=3072) |
| // framework2 share = 1 |
| |
| // Ensure that all resources are offered equally between both frameworks, |
| // since each framework's role has a weight of 1.0 by default. |
| hashmap<FrameworkID, Allocation> frameworkAllocations; |
| Resources totalAllocatedResources; |
| awaitAllocationsAndRecoverResources(totalAllocatedResources, |
| frameworkAllocations, 2, true); |
| |
| // Framework1 should get one allocation with three agents. |
| ASSERT_EQ(3u, frameworkAllocations[framework1.id()].resources.size()); |
| EXPECT_EQ(Resources::parse(TRIPLE_RESOURCES).get(), |
| Resources::sum(frameworkAllocations[framework1.id()].resources)); |
| |
| // Framework2 should also get one allocation with three agents. |
| ASSERT_EQ(3u, frameworkAllocations[framework2.id()].resources.size()); |
| EXPECT_EQ(Resources::parse(TRIPLE_RESOURCES).get(), |
| Resources::sum(frameworkAllocations[framework2.id()].resources)); |
| |
| // Check to ensure that these two allocations sum to the total resources; |
| // this check can ensure there are only two allocations in this case. |
| EXPECT_EQ(Resources::parse(TOTAL_RESOURCES).get(), totalAllocatedResources); |
| } |
| |
| // Tests whether `framework1` gets 1/3 of the resources and `framework2` gets |
| // 2/3 of the resources when their roles' weights are 1:2. |
| { |
| // Update the weight of framework2's role to 2.0. |
| vector<WeightInfo> weightInfos; |
| weightInfos.push_back(createWeightInfo(framework2.role(), 2.0)); |
| allocator->updateWeights(weightInfos); |
| |
| // 'updateWeights' will trigger the allocation immediately, so it does not |
| // need to manually advance the clock here. |
| |
| // role1 share = 0.33 (cpus=4, mem=2048) |
| // framework1 share = 1 |
| // role2 share = 0.66 (cpus=8, mem=4096) |
| // framework2 share = 1 |
| |
| // Now that the frameworks's weights are 1:2, ensure that all |
| // resources are offered with a ratio of 1:2 between both frameworks. |
| hashmap<FrameworkID, Allocation> frameworkAllocations; |
| Resources totalAllocatedResources; |
| awaitAllocationsAndRecoverResources(totalAllocatedResources, |
| frameworkAllocations, 2, true); |
| |
| // Framework1 should get one allocation with two agents. |
| ASSERT_EQ(2u, frameworkAllocations[framework1.id()].resources.size()); |
| EXPECT_EQ(Resources::parse(DOUBLE_RESOURCES).get(), |
| Resources::sum(frameworkAllocations[framework1.id()].resources)); |
| |
| // Framework2 should get one allocation with four agents. |
| ASSERT_EQ(4u, frameworkAllocations[framework2.id()].resources.size()); |
| EXPECT_EQ(Resources::parse(FOURFOLD_RESOURCES).get(), |
| Resources::sum(frameworkAllocations[framework2.id()].resources)); |
| |
| // Check to ensure that these two allocations sum to the total resources; |
| // this check can ensure there are only two allocations in this case. |
| EXPECT_EQ(Resources::parse(TOTAL_RESOURCES).get(), totalAllocatedResources); |
| } |
| |
| // Tests whether `framework1` gets 1/6 of the resources, `framework2` gets |
| // 2/6 of the resources and `framework3` gets 3/6 of the resources when their |
| // roles' weights are 1:2:3. |
| { |
| // Add a new role with a weight of 3.0. |
| vector<WeightInfo> weightInfos; |
| weightInfos.push_back(createWeightInfo("role3", 3.0)); |
| allocator->updateWeights(weightInfos); |
| |
| // 'updateWeights' will not trigger the allocation immediately because no |
| // framework exists in 'role3' yet. |
| |
| // Framework3 registers with 'role3'. |
| FrameworkInfo framework3 = createFrameworkInfo("role3"); |
| allocator->addFramework(framework3.id(), framework3, {}); |
| |
| // 'addFramework' will trigger the allocation immediately, so it does not |
| // need to manually advance the clock here. |
| |
| // role1 share = 0.166 (cpus=2, mem=1024) |
| // framework1 share = 1 |
| // role2 share = 0.333 (cpus=4, mem=2048) |
| // framework2 share = 1 |
| // role3 share = 0.50 (cpus=6, mem=3072) |
| // framework3 share = 1 |
| |
| // Currently, there are three frameworks and six agents in this cluster, |
| // and the weight ratio of these frameworks is 1:2:3, therefore frameworks |
| // will get the proper resource ratio of 1:2:3. |
| hashmap<FrameworkID, Allocation> frameworkAllocations; |
| Resources totalAllocatedResources; |
| awaitAllocationsAndRecoverResources(totalAllocatedResources, |
| frameworkAllocations, 3, false); |
| |
| // Framework1 should get one allocation with one agent. |
| ASSERT_EQ(1u, frameworkAllocations[framework1.id()].resources.size()); |
| EXPECT_EQ(Resources::parse(SINGLE_RESOURCE).get(), |
| Resources::sum(frameworkAllocations[framework1.id()].resources)); |
| |
| // Framework2 should get one allocation with two agents. |
| ASSERT_EQ(2u, frameworkAllocations[framework2.id()].resources.size()); |
| EXPECT_EQ(Resources::parse(DOUBLE_RESOURCES).get(), |
| Resources::sum(frameworkAllocations[framework2.id()].resources)); |
| |
| // Framework3 should get one allocation with three agents. |
| ASSERT_EQ(3u, frameworkAllocations[framework3.id()].resources.size()); |
| EXPECT_EQ(Resources::parse(TRIPLE_RESOURCES).get(), |
| Resources::sum(frameworkAllocations[framework3.id()].resources)); |
| |
| // Check to ensure that these three allocations sum to the total resources; |
| // this check can ensure there are only three allocations in this case. |
| EXPECT_EQ(Resources::parse(TOTAL_RESOURCES).get(), totalAllocatedResources); |
| } |
| } |
| |
| |
| // This test checks that if a framework declines resources with a |
| // long filter, it will be offered filtered resources again after |
| // reviving offers. |
| TEST_F(HierarchicalAllocatorTest, ReviveOffers) |
| { |
| // Pausing the clock is not necessary, but ensures that the test |
| // doesn't rely on the batch allocation in the allocator, which |
| // would slow down the test. |
| Clock::pause(); |
| |
| initialize(); |
| |
| // Total cluster resources will become cpus=2, mem=1024. |
| SlaveInfo agent = createSlaveInfo("cpus:2;mem:1024;disk:0"); |
| allocator->addSlave(agent.id(), agent, None(), agent.resources(), {}); |
| |
| // Framework will be offered all of agent's resources since it is |
| // the only framework running so far. |
| FrameworkInfo framework = createFrameworkInfo("role1"); |
| allocator->addFramework(framework.id(), framework, {}); |
| |
| Future<Allocation> allocation = allocations.get(); |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent.resources(), Resources::sum(allocation.get().resources)); |
| |
| Filters filter1000s; |
| filter1000s.set_refuse_seconds(1000.); |
| allocator->recoverResources( |
| framework.id(), |
| agent.id(), |
| agent.resources(), |
| filter1000s); |
| |
| // Advance the clock to trigger a batch allocation. |
| Clock::advance(flags.allocation_interval); |
| Clock::settle(); |
| |
| allocation = allocations.get(); |
| EXPECT_TRUE(allocation.isPending()); |
| |
| allocator->reviveOffers(framework.id()); |
| |
| // Framework will be offered all of agent's resources again |
| // after reviving offers. |
| AWAIT_READY(allocation); |
| EXPECT_EQ(framework.id(), allocation.get().frameworkId); |
| EXPECT_EQ(agent.resources(), Resources::sum(allocation.get().resources)); |
| } |
| |
| |
| class HierarchicalAllocator_BENCHMARK_Test |
| : public HierarchicalAllocatorTestBase, |
| public WithParamInterface<std::tr1::tuple<size_t, size_t>> {}; |
| |
| |
| // The Hierarchical Allocator benchmark tests are parameterized |
| // by the number of slaves. |
| INSTANTIATE_TEST_CASE_P( |
| SlaveAndFrameworkCount, |
| HierarchicalAllocator_BENCHMARK_Test, |
| ::testing::Combine( |
| ::testing::Values(1000U, 5000U, 10000U, 20000U, 30000U, 50000U), |
| ::testing::Values(1U, 50U, 100U, 200U, 500U, 1000U, 3000U, 6000U)) |
| ); |
| |
| |
| // TODO(bmahler): Should also measure how expensive it is to |
| // add a framework after the slaves are added. |
| TEST_P(HierarchicalAllocator_BENCHMARK_Test, AddAndUpdateSlave) |
| { |
| size_t slaveCount = std::tr1::get<0>(GetParam()); |
| size_t frameworkCount = std::tr1::get<1>(GetParam()); |
| |
| vector<SlaveInfo> slaves; |
| slaves.reserve(slaveCount); |
| |
| vector<FrameworkInfo> frameworks; |
| frameworks.reserve(frameworkCount); |
| |
| for (size_t i = 0; i < slaveCount; i++) { |
| slaves.push_back(createSlaveInfo( |
| "cpus:2;mem:1024;disk:4096;ports:[31000-32000]")); |
| } |
| |
| for (size_t i = 0; i < frameworkCount; i++) { |
| frameworks.push_back(createFrameworkInfo( |
| "*", |
| {FrameworkInfo::Capability::REVOCABLE_RESOURCES})); |
| } |
| |
| cout << "Using " << slaveCount << " agents" |
| << " and " << frameworkCount << " frameworks" << endl; |
| |
| Clock::pause(); |
| |
| atomic<size_t> offerCallbacks(0); |
| |
| auto offerCallback = [&offerCallbacks]( |
| const FrameworkID& frameworkId, |
| const hashmap<SlaveID, Resources>& resources) { |
| offerCallbacks++; |
| }; |
| |
| initialize(master::Flags(), offerCallback); |
| |
| Stopwatch watch; |
| watch.start(); |
| |
| foreach (const FrameworkInfo& framework, frameworks) { |
| allocator->addFramework(framework.id(), framework, {}); |
| } |
| |
| // Wait for all the `addFramework` operations to be processed. |
| Clock::settle(); |
| |
| watch.stop(); |
| |
| cout << "Added " << frameworkCount << " frameworks" |
| << " in " << watch.elapsed() << endl; |
| |
| const Resources slaveResources = Resources::parse( |
| "cpus:1;mem:128;disk:1024;" |
| "ports:[31126-31510,31512-31623,31810-31852,31854-31964]").get(); |
| |
| watch.start(); |
| |
| // Add the slaves, use round-robin to choose which framework |
| // to allocate a slice of the slave's resources to. |
| for (size_t i = 0; i < slaves.size(); i++) { |
| hashmap<FrameworkID, Resources> used; |
| |
| used[frameworks[i % frameworkCount].id()] = slaveResources; |
| |
| allocator->addSlave( |
| slaves[i].id(), |
| slaves[i], |
| None(), |
| slaves[i].resources(), |
| used); |
| } |
| |
| // Wait for all the `addSlave` operations to be processed. |
| Clock::settle(); |
| |
| watch.stop(); |
| |
| ASSERT_EQ(slaveCount, offerCallbacks.load()); |
| |
| cout << "Added " << slaveCount << " agents" |
| << " in " << watch.elapsed() << endl; |
| |
| // Oversubscribed resources on each slave. |
| Resource oversubscribed = Resources::parse("cpus", "10", "*").get(); |
| oversubscribed.mutable_revocable(); |
| |
| watch.start(); // Reset. |
| |
| foreach (const SlaveInfo& slave, slaves) { |
| allocator->updateSlave(slave.id(), oversubscribed); |
| } |
| |
| // Wait for all the `updateSlave` operations to be processed. |
| Clock::settle(); |
| |
| watch.stop(); |
| |
| ASSERT_EQ(slaveCount * 2, offerCallbacks.load()); |
| |
| cout << "Updated " << slaveCount << " agents in " << watch.elapsed() << endl; |
| } |
| |
| |
| // This benchmark simulates a number of frameworks that have a fixed amount of |
| // work to do. Once they have reached their targets, they start declining all |
| // subsequent offers. |
| TEST_P(HierarchicalAllocator_BENCHMARK_Test, DeclineOffers) |
| { |
| size_t slaveCount = std::tr1::get<0>(GetParam()); |
| size_t frameworkCount = std::tr1::get<1>(GetParam()); |
| |
| master::Flags flags; |
| |
| // Choose an interval longer than the time we expect a single cycle to take so |
| // that we don't back up the process queue. |
| flags.allocation_interval = Hours(1); |
| |
| // Pause the clock because we want to manually drive the allocations. |
| Clock::pause(); |
| |
| struct OfferedResources { |
| FrameworkID frameworkId; |
| SlaveID slaveId; |
| Resources resources; |
| }; |
| |
| vector<OfferedResources> offers; |
| |
| auto offerCallback = [&offers]( |
| const FrameworkID& frameworkId, |
| const hashmap<SlaveID, Resources>& resources_) |
| { |
| foreach (auto resources, resources_) { |
| offers.push_back( |
| OfferedResources{frameworkId, resources.first, resources.second}); |
| } |
| }; |
| |
| cout << "Using " << slaveCount << " agents and " |
| << frameworkCount << " frameworks" << endl; |
| |
| vector<SlaveInfo> slaves; |
| slaves.reserve(slaveCount); |
| |
| vector<FrameworkInfo> frameworks; |
| frameworks.reserve(frameworkCount); |
| |
| initialize(flags, offerCallback); |
| |
| Stopwatch watch; |
| watch.start(); |
| |
| for (size_t i = 0; i < frameworkCount; i++) { |
| frameworks.push_back(createFrameworkInfo("*")); |
| allocator->addFramework(frameworks[i].id(), frameworks[i], {}); |
| } |
| |
| // Wait for all the `addFramework` operations to be processed. |
| Clock::settle(); |
| |
| watch.stop(); |
| |
| cout << "Added " << frameworkCount << " frameworks in " |
| << watch.elapsed() << endl; |
| |
| Resources resources = Resources::parse( |
| "cpus:16;mem:2014;disk:1024").get(); |
| |
| Try<::mesos::Value::Ranges> ranges = fragment(createRange(31000, 32000), 16); |
| ASSERT_SOME(ranges); |
| ASSERT_EQ(16, ranges->range_size()); |
| |
| resources += createPorts(ranges.get()); |
| |
| watch.start(); |
| |
| for (size_t i = 0; i < slaveCount; i++) { |
| slaves.push_back(createSlaveInfo( |
| "cpus:24;mem:4096;disk:4096;ports:[31000-32000]")); |
| |
| // Add some used resources on each slave. Let's say there are 16 tasks, each |
| // is allocated 1 cpu and a random port from the port range. |
| hashmap<FrameworkID, Resources> used; |
| used[frameworks[i % frameworkCount].id()] = resources; |
| allocator->addSlave( |
| slaves[i].id(), slaves[i], None(), slaves[i].resources(), used); |
| } |
| |
| // Wait for all the `addSlave` operations to be processed. |
| Clock::settle(); |
| |
| watch.stop(); |
| |
| cout << "Added " << slaveCount << " agents in " |
| << watch.elapsed() << endl; |
| |
| size_t declinedOfferCount = 0; |
| |
| // Loop enough times for all the frameworks to get offered all the resources. |
| for (size_t i = 0; i < frameworkCount * 2; i++) { |
| // Permanently decline any offered resources. |
| foreach (auto offer, offers) { |
| Filters filters; |
| |
| filters.set_refuse_seconds(INT_MAX); |
| allocator->recoverResources( |
| offer.frameworkId, offer.slaveId, offer.resources, filters); |
| } |
| |
| declinedOfferCount += offers.size(); |
| |
| // Wait for the declined offers. |
| Clock::settle(); |
| offers.clear(); |
| |
| watch.start(); |
| |
| // Advance the clock and trigger a background allocation cycle. |
| Clock::advance(flags.allocation_interval); |
| Clock::settle(); |
| |
| watch.stop(); |
| |
| cout << "round " << i |
| << " allocate() took " << watch.elapsed() |
| << " to make " << offers.size() << " offers" |
| << " after filtering " << declinedOfferCount << " offers" << endl; |
| } |
| |
| Clock::resume(); |
| } |
| |
| |
| // Returns the requested number of labels: |
| // [{"<key>_1": "<value>_1"}, ..., {"<key>_<count>":"<value>_<count>"}] |
| static Labels createLabels( |
| const string& key, |
| const string& value, |
| size_t count) |
| { |
| Labels labels; |
| |
| for (size_t i = 0; i < count; i++) { |
| const string index = stringify(i); |
| labels.add_labels()->CopyFrom(createLabel(key + index, value + index)); |
| } |
| |
| return labels; |
| } |
| |
| |
| // TODO(neilc): Refactor to reduce code duplication with `DeclineOffers` test. |
| TEST_P(HierarchicalAllocator_BENCHMARK_Test, ResourceLabels) |
| { |
| size_t slaveCount = std::tr1::get<0>(GetParam()); |
| size_t frameworkCount = std::tr1::get<1>(GetParam()); |
| |
| master::Flags flags; |
| |
| // Choose an interval longer than the time we expect a single cycle to take so |
| // that we don't back up the process queue. |
| flags.allocation_interval = Hours(1); |
| |
| // Pause the clock because we want to manually drive the allocations. |
| Clock::pause(); |
| |
| struct OfferedResources { |
| FrameworkID frameworkId; |
| SlaveID slaveId; |
| Resources resources; |
| }; |
| |
| vector<OfferedResources> offers; |
| |
| auto offerCallback = [&offers]( |
| const FrameworkID& frameworkId, |
| const hashmap<SlaveID, Resources>& resources_) |
| { |
| foreach (auto resources, resources_) { |
| offers.push_back( |
| OfferedResources{frameworkId, resources.first, resources.second}); |
| } |
| }; |
| |
| cout << "Using " << slaveCount << " agents and " |
| << frameworkCount << " frameworks" << endl; |
| |
| vector<SlaveInfo> slaves; |
| slaves.reserve(slaveCount); |
| |
| vector<FrameworkInfo> frameworks; |
| frameworks.reserve(frameworkCount); |
| |
| initialize(flags, offerCallback); |
| |
| Stopwatch watch; |
| watch.start(); |
| |
| for (size_t i = 0; i < frameworkCount; i++) { |
| frameworks.push_back(createFrameworkInfo("role1")); |
| allocator->addFramework(frameworks[i].id(), frameworks[i], {}); |
| } |
| |
| // Wait for all the `addFramework` operations to be processed. |
| Clock::settle(); |
| |
| watch.stop(); |
| |
| cout << "Added " << frameworkCount << " frameworks in " |
| << watch.elapsed() << endl; |
| |
| // Create the used resources at each slave. We use three blocks of |
| // resources: unreserved mem/disk/ports, and two different labeled |
| // reservations with distinct labels. We choose the labels so that |
| // the last label (in storage order) is different, which is the |
| // worst-case for the equality operator. We also ensure that the |
| // labels at any two nodes are distinct, which means they can't be |
| // aggregated easily by the master/allocator. |
| Resources resources = Resources::parse("mem:2014;disk:1024").get(); |
| |
| Try<::mesos::Value::Ranges> ranges = fragment(createRange(31000, 32000), 16); |
| ASSERT_SOME(ranges); |
| ASSERT_EQ(16, ranges->range_size()); |
| |
| resources += createPorts(ranges.get()); |
| |
| watch.start(); |
| |
| for (size_t i = 0; i < slaveCount; i++) { |
| slaves.push_back(createSlaveInfo( |
| "cpus:24;mem:4096;disk:4096;ports:[31000-32000]")); |
| |
| Resources agentResources = resources; |
| |
| // We create reservations with 12 labels as we expect this is |
| // more than most frameworks use. Note that only the 12th |
| // label differs between the two sets of labels as this triggers |
| // the pathological performance path in the Labels equality |
| // operator. |
| // |
| // We add a unique id to each agent's reservation labels to |
| // ensure that any aggregation across agents leads to |
| // pathological performance (reservations with distinct labels |
| // cannot be merged). |
| // |
| // TODO(neilc): Test with longer key / value lengths. |
| Labels labels1 = createLabels("key", "value", 11); |
| labels1.add_labels()->CopyFrom( |
| createLabel("unique_key_1", "value_" + stringify(i))); |
| |
| Labels labels2 = createLabels("key", "value", 11); |
| labels1.add_labels()->CopyFrom( |
| createLabel("unique_key_2", "value_" + stringify(i))); |
| |
| Resources reserved1 = |
| createReservedResource("cpus", "8", "role1", |
| createReservationInfo("principal1", labels1)); |
| Resources reserved2 = |
| createReservedResource("cpus", "8", "role1", |
| createReservationInfo("principal1", labels2)); |
| |
| agentResources += reserved1; |
| agentResources += reserved2; |
| |
| // Add some used resources on each slave. Let's say there are 16 tasks, each |
| // is allocated 1 cpu and a random port from the port range. |
| hashmap<FrameworkID, Resources> used; |
| used[frameworks[i % frameworkCount].id()] = agentResources; |
| allocator->addSlave( |
| slaves[i].id(), slaves[i], None(), slaves[i].resources(), used); |
| } |
| |
| // Wait for all the `addSlave` operations to be processed. |
| Clock::settle(); |
| |
| watch.stop(); |
| |
| cout << "Added " << slaveCount << " agents in " |
| << watch.elapsed() << endl; |
| |
| size_t declinedOfferCount = 0; |
| |
| // Loop enough times for all the frameworks to get offered all the resources. |
| for (size_t i = 0; i < frameworkCount * 2; i++) { |
| // Permanently decline any offered resources. |
| foreach (auto offer, offers) { |
| Filters filters; |
| |
| filters.set_refuse_seconds(INT_MAX); |
| allocator->recoverResources( |
| offer.frameworkId, offer.slaveId, offer.resources, filters); |
| } |
| |
| declinedOfferCount += offers.size(); |
| |
| // Wait for the declined offers. |
| Clock::settle(); |
| offers.clear(); |
| |
| watch.start(); |
| |
| // Advance the clock and trigger a background allocation cycle. |
| Clock::advance(flags.allocation_interval); |
| Clock::settle(); |
| |
| watch.stop(); |
| |
| cout << "round " << i |
| << " allocate() took " << watch.elapsed() |
| << " to make " << offers.size() << " offers" |
| << " after filtering " << declinedOfferCount << " offers" << endl; |
| } |
| |
| Clock::resume(); |
| } |
| |
| |
| // This benchmark measures the effects of framework suppression |
| // on allocation times. |
| TEST_P(HierarchicalAllocator_BENCHMARK_Test, SuppressOffers) |
| { |
| size_t agentCount = std::tr1::get<0>(GetParam()); |
| size_t frameworkCount = std::tr1::get<1>(GetParam()); |
| |
| // Pause the clock because we want to manually drive the allocations. |
| Clock::pause(); |
| |
| struct Allocation |
| { |
| FrameworkID frameworkId; |
| SlaveID slaveId; |
| Resources resources; |
| }; |
| |
| vector<Allocation> allocations; |
| |
| auto offerCallback = [&allocations]( |
| const FrameworkID& frameworkId, |
| const hashmap<SlaveID, Resources>& resources) |
| { |
| foreachpair (const SlaveID& slaveId, const Resources& r, resources) { |
| Allocation allocation; |
| allocation.frameworkId = frameworkId; |
| allocation.slaveId = slaveId; |
| allocation.resources = r; |
| |
| allocations.push_back(std::move(allocation)); |
| } |
| }; |
| |
| cout << "Using " << agentCount << " agents and " |
| << frameworkCount << " frameworks" << endl; |
| |
| master::Flags flags; |
| initialize(flags, offerCallback); |
| |
| vector<FrameworkInfo> frameworks; |
| frameworks.reserve(frameworkCount); |
| |
| Stopwatch watch; |
| watch.start(); |
| |
| for (size_t i = 0; i < frameworkCount; i++) { |
| frameworks.push_back(createFrameworkInfo("*")); |
| allocator->addFramework(frameworks[i].id(), frameworks[i], {}); |
| } |
| |
| // Wait for all the `addFramework` operations to be processed. |
| Clock::settle(); |
| |
| watch.stop(); |
| |
| cout << "Added " << frameworkCount << " frameworks" |
| << " in " << watch.elapsed() << endl; |
| |
| vector<SlaveInfo> agents; |
| agents.reserve(agentCount); |
| |
| // Each agent has a portion of it's resources allocated to a single |
| // framework. We round-robin through the frameworks when allocating. |
| Resources allocation = Resources::parse("cpus:16;mem:1024;disk:1024").get(); |
| |
| Try<::mesos::Value::Ranges> ranges = fragment(createRange(31000, 32000), 16); |
| ASSERT_SOME(ranges); |
| ASSERT_EQ(16, ranges->range_size()); |
| |
| allocation += createPorts(ranges.get()); |
| |
| watch.start(); |
| |
| for (size_t i = 0; i < agentCount; i++) { |
| agents.push_back(createSlaveInfo( |
| "cpus:24;mem:4096;disk:4096;ports:[31000-32000]")); |
| |
| hashmap<FrameworkID, Resources> used; |
| used[frameworks[i % frameworkCount].id()] = allocation; |
| |
| allocator->addSlave( |
| agents[i].id(), agents[i], None(), agents[i].resources(), used); |
| } |
| |
| // Wait for all the `addSlave` operations to be processed. |
| Clock::settle(); |
| |
| watch.stop(); |
| |
| cout << "Added " << agentCount << " agents" |
| << " in " << watch.elapsed() << endl; |
| |
| // Now perform allocations. Each time we trigger an allocation run, we |
| // increase the number of frameworks that are suppressing offers. To |
| // ensure the test can run in a timely manner, we always perform a |
| // fixed number of allocations. |
| // |
| // TODO(jjanco): Parameterize this test by allocationsCount, not an arbitrary |
| // number. Batching reduces loop size, lowering time to test completion. |
| size_t allocationsCount = 5; |
| size_t suppressCount = 0; |
| |
| for (size_t i = 0; i < allocationsCount; ++i) { |
| // Recover resources with no filters because we want to test the |
| // effect of suppression alone. |
| foreach (const Allocation& allocation, allocations) { |
| allocator->recoverResources( |
| allocation.frameworkId, |
| allocation.slaveId, |
| allocation.resources, |
| None()); |
| } |
| |
| // Wait for all declined offers to be processed. |
| Clock::settle(); |
| allocations.clear(); |
| |
| // Suppress another batch of frameworks. For simplicity and readability |
| // we loop on allocationsCount. The implication here is that there can be |
| // 'frameworkCount % allocationsCount' of frameworks not suppressed. For |
| // the purposes of the benchmark this is not an issue. |
| for (size_t j = 0; j < frameworkCount / allocationsCount; ++j) { |
| allocator->suppressOffers(frameworks[suppressCount].id()); |
| ++suppressCount; |
| } |
| |
| // Wait for all the `suppressOffers` operations to be processed |
| // so we only measure the allocation time. |
| Clock::settle(); |
| |
| watch.start(); |
| |
| // Advance the clock and trigger a batch allocation. |
| Clock::advance(flags.allocation_interval); |
| Clock::settle(); |
| |
| watch.stop(); |
| |
| cout << "allocate() took " << watch.elapsed() |
| << " to make " << allocations.size() << " offers with " |
| << suppressCount << " out of " |
| << frameworkCount << " frameworks suppressing offers" |
| << endl; |
| } |
| |
| Clock::resume(); |
| } |
| |
| |
| // Measures the processing time required for the allocator metrics. |
| // |
| // TODO(bmahler): Add allocations to this benchmark. |
| TEST_P(HierarchicalAllocator_BENCHMARK_Test, Metrics) |
| { |
| size_t slaveCount = std::tr1::get<0>(GetParam()); |
| size_t frameworkCount = std::tr1::get<1>(GetParam()); |
| |
| // Pause the clock because we want to manually drive the allocations. |
| Clock::pause(); |
| |
| initialize(); |
| |
| Stopwatch watch; |
| watch.start(); |
| |
| for (size_t i = 0; i < frameworkCount; i++) { |
| string role = stringify(i); |
| allocator->setQuota(role, createQuota(role, "cpus:1;mem:512;disk:256")); |
| } |
| |
| // Wait for all the `setQuota` operations to be processed. |
| Clock::settle(); |
| |
| watch.stop(); |
| |
| cout << "Set quota for " << frameworkCount << " roles in " |
| << watch.elapsed() << endl; |
| |
| watch.start(); |
| |
| for (size_t i = 0; i < frameworkCount; i++) { |
| FrameworkInfo framework = createFrameworkInfo(stringify(i)); |
| allocator->addFramework(framework.id(), framework, {}); |
| } |
| |
| // Wait for all the `addFramework` operations to be processed. |
| Clock::settle(); |
| |
| watch.stop(); |
| |
| cout << "Added " << frameworkCount << " frameworks in " |
| << watch.elapsed() << endl; |
| |
| watch.start(); |
| |
| for (size_t i = 0; i < slaveCount; i++) { |
| SlaveInfo slave = createSlaveInfo("cpus:16;mem:2048;disk:1024"); |
| allocator->addSlave(slave.id(), slave, None(), slave.resources(), {}); |
| } |
| |
| // Wait for all the `addSlave` operations to complete. |
| Clock::settle(); |
| |
| watch.stop(); |
| |
| cout << "Added " << slaveCount << " agents in " |
| << watch.elapsed() << endl; |
| |
| // TODO(bmahler): Avoid timing the JSON parsing here. |
| // Ideally we also avoid timing the HTTP layer. |
| watch.start(); |
| JSON::Object metrics = Metrics(); |
| watch.stop(); |
| |
| cout << "/metrics/snapshot took " << watch.elapsed() |
| << " for " << slaveCount << " agents" |
| << " and " << frameworkCount << " frameworks" << endl; |
| } |
| |
| } // namespace tests { |
| } // namespace internal { |
| } // namespace mesos { |