blob: a240a2bd51be5bda7f230c37c86431a0aff9173c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <iostream>
#include <string>
#include <vector>
#include <gmock/gmock.h>
#include <mesos/allocator/allocator.hpp>
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gtest.hpp>
#include <process/queue.hpp>
#include <stout/duration.hpp>
#include <stout/gtest.hpp>
#include <stout/hashmap.hpp>
#include <stout/stopwatch.hpp>
#include "common/resource_quantities.hpp"
#include "master/constants.hpp"
#include "master/allocator/mesos/hierarchical.hpp"
#include "tests/allocator.hpp"
#include "tests/mesos.hpp"
using mesos::internal::master::MIN_CPUS;
using mesos::internal::master::MIN_MEM;
using mesos::internal::master::allocator::HierarchicalDRFAllocator;
using mesos::internal::ResourceQuantities;
using mesos::internal::slave::AGENT_CAPABILITIES;
using mesos::allocator::Allocator;
using mesos::allocator::Options;
using process::Clock;
using process::Future;
using std::cout;
using std::endl;
using std::make_shared;
using std::ostream;
using std::set;
using std::shared_ptr;
using std::string;
using std::vector;
using testing::WithParamInterface;
namespace mesos {
namespace internal {
namespace tests {
// TODO(kapil): Add support for per-framework-profile configuration for
// offer acceptance/rejection.
struct FrameworkProfile
{
FrameworkProfile(
const string& _name,
const set<string>& _roles,
size_t _instances,
// For frameworks that do not care about launching tasks, we provide
// some default task launch settings.
size_t _maxTasksPerInstance = 100,
const Resources& _taskResources =
CHECK_NOTERROR(Resources::parse("cpus:1;mem:100")),
size_t _maxTasksPerOffer = 10)
: name(_name),
roles(_roles),
instances(_instances),
maxTasksPerInstance(_maxTasksPerInstance),
taskResources(_taskResources),
maxTasksPerOffer(_maxTasksPerOffer) {}
string name;
set<string> roles;
size_t instances;
const size_t maxTasksPerInstance;
Resources taskResources;
const size_t maxTasksPerOffer;
};
struct AgentProfile
{
// TODO(mzhu): Add option to specify `used` resources. `used` resources
// requires the knowledge of `frameworkId` which currently is created
// during the initialization which is after the agent profile creation.
AgentProfile(const string& _name,
size_t _instances,
const Resources& _resources)
: name(_name),
instances(_instances),
resources(_resources) {}
string name;
size_t instances;
Resources resources;
};
struct OfferedResources
{
FrameworkID frameworkId;
SlaveID slaveId;
Resources resources;
string role;
};
struct BenchmarkConfig
{
BenchmarkConfig(const string& allocator_ = master::DEFAULT_ALLOCATOR,
const string& roleSorter_ = "drf",
const string& frameworkSorter_ = "drf",
const Duration& allocationInterval_ =
master::DEFAULT_ALLOCATION_INTERVAL)
: allocator(allocator_),
roleSorter(roleSorter_),
frameworkSorter(frameworkSorter_),
allocationInterval(allocationInterval_)
{
minAllocatableResources.push_back(CHECK_NOTERROR(
ResourceQuantities::fromString("cpus:" + stringify(MIN_CPUS))));
minAllocatableResources.push_back(
CHECK_NOTERROR(ResourceQuantities::fromString(
"mem:" + stringify((double)MIN_MEM.bytes() / Bytes::MEGABYTES))));
}
string allocator;
string roleSorter;
string frameworkSorter;
Duration allocationInterval;
vector<ResourceQuantities> minAllocatableResources;
vector<FrameworkProfile> frameworkProfiles;
vector<AgentProfile> agentProfiles;
};
class HierarchicalAllocations_BenchmarkBase : public ::testing::Test
{
protected:
HierarchicalAllocations_BenchmarkBase ()
: totalTasksToLaunch(0) {}
~HierarchicalAllocations_BenchmarkBase () override
{
delete allocator;
}
void initializeCluster(
const BenchmarkConfig& config,
Option<lambda::function<
void(const FrameworkID&,
const hashmap<string, hashmap<SlaveID, Resources>>&)>>
offerCallback = None())
{
bool clockPaused = Clock::paused();
// If clock was not paused, pause the clock so that we could
// make measurements.
if (!clockPaused) {
Clock::pause();
}
if (offerCallback.isNone()) {
offerCallback =
[this](
const FrameworkID& frameworkId,
const hashmap<string, hashmap<SlaveID, Resources>>& resources_) {
foreachkey (const string& role, resources_) {
foreachpair (
const SlaveID& slaveId,
const Resources& resources,
resources_.at(role)) {
offers.put(
OfferedResources{frameworkId, slaveId, resources, role});
}
}
};
}
allocator = CHECK_NOTERROR(Allocator::create(
config.allocator, config.roleSorter, config.frameworkSorter));
Options options;
options.allocationInterval = config.allocationInterval;
options.minAllocatableResources = config.minAllocatableResources;
allocator->initialize(
options,
CHECK_NOTNONE(offerCallback),
{});
Stopwatch watch;
watch.start();
// Add agents.
size_t agentCount = 0;
for (const AgentProfile& profile : config.agentProfiles) {
agentCount += profile.instances;
for (size_t i = 0; i < profile.instances; i++) {
const string agentName = profile.name + "-" + stringify(i);
SlaveInfo agent;
*(agent.mutable_resources()) = profile.resources;
agent.mutable_id()->set_value(agentName);
agent.set_hostname(agentName);
allocator->addSlave(
agent.id(),
agent,
AGENT_CAPABILITIES(),
None(),
agent.resources(),
{});
}
}
// Wait for all the `addSlave` operations to be processed.
Clock::settle();
watch.stop();
cout << "Added " << agentCount << " agents"
<< " in " << watch.elapsed() << endl;
// Pause the allocator here to prevent any event-driven allocations while
// adding frameworks.
allocator->pause();
watch.start();
// Add frameworks.
size_t frameworkCount = 0;
for (const FrameworkProfile& profile : config.frameworkProfiles) {
totalTasksToLaunch += profile.instances * profile.maxTasksPerInstance;
frameworkCount += profile.instances;
shared_ptr<FrameworkProfile> sharedProfile =
make_shared<FrameworkProfile>(profile);
for (size_t i = 0; i < profile.instances; i++) {
const string frameworkName = profile.name + "-" + stringify(i);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_name(frameworkName);
frameworkInfo.mutable_id()->set_value(frameworkName);
frameworkInfo.clear_roles();
for (const string& role : profile.roles) {
frameworkInfo.add_roles(role);
}
frameworkProfiles[frameworkInfo.id()] = sharedProfile;
allocator->addFramework(
frameworkInfo.id(),
frameworkInfo,
{},
true,
{});
}
}
// Wait for all the `addFramework` operations to be processed.
Clock::settle();
watch.stop();
cout << "Added " << frameworkCount << " frameworks"
<< " in " << watch.elapsed() << endl;
// Resume the clock if it was not paused.
if (!clockPaused) {
Clock::resume();
}
allocator->resume();
}
const FrameworkProfile& getFrameworkProfile(const FrameworkID& id)
{
return *frameworkProfiles[id];
}
Allocator* allocator;
process::Queue<OfferedResources> offers;
size_t totalTasksToLaunch;
private:
hashmap<FrameworkID, shared_ptr<FrameworkProfile>> frameworkProfiles;
};
class BENCHMARK_HierarchicalAllocations :
public HierarchicalAllocations_BenchmarkBase {};
// This benchmark launches frameworks with different profiles (number of tasks,
// task sizes and etc.) and prints out statistics such as total tasks launched,
// cluster utilization and allocation latency. The test has a timeout of 30
// seconds.
TEST_F(BENCHMARK_HierarchicalAllocations, MultiFrameworkAllocations)
{
// Pause the clock because we want to manually drive the allocations.
Clock::pause();
BenchmarkConfig config;
// Add agent profiles.
config.agentProfiles.push_back(AgentProfile(
"agent",
80,
CHECK_NOTERROR(Resources::parse("cpus:64;mem:488000"))));
// Add framework profiles.
// A profile to simulate frameworks that launch thousands of smaller apps.
// It is similar in behavior to some meta-frameworks such as Marathon. This
// also limits max tasks per offer to 100 to spread the load more uniformly
// over allocation cycles.
for (size_t i = 0; i < 4; i++) {
config.frameworkProfiles.push_back(FrameworkProfile(
"Marathon-" + stringify(i + 1),
{"roleA-" + stringify(i + 1)},
1,
4000,
CHECK_NOTERROR(Resources::parse("cpus:0.07;mem:400;")),
100));
}
// A profile to simulate workloads where a large number of frameworks launch a
// handful of tasks. E.g., a large number of Jenkins masters which are trying
// to launch some build jobs. We enforce a task-per-offer limit of 1.
config.frameworkProfiles.push_back(FrameworkProfile(
"Jenkins",
{"roleB"},
500,
5,
CHECK_NOTERROR(Resources::parse("cpus:0.1;mem:4000;")),
1));
// A profile to simulate workloads where frameworks launch larger jobs that
// are similar in spirit to Spark dispatcher/driver model. We enforce a
// task-per-offer limit of 5.
for (size_t i = 0; i < 50; i++) {
config.frameworkProfiles.push_back(FrameworkProfile(
"SparkDispatcher-" + stringify(i + 1),
{"roleC-" + stringify(i + 1)},
1,
20,
CHECK_NOTERROR(Resources::parse("cpus:1;mem:1000;")),
5));
}
initializeCluster(config);
cout << "Start allocation\n";
// Now perform allocations. We continue until either we timeout or we have
// launched all of the expected tasks.
const Duration TEST_TIMEOUT = Seconds(30);
// Total tasks launched per framework. Used to enforce task caps.
hashmap<FrameworkID, size_t> frameworkTasksLaunched;
Stopwatch totalTime;
totalTime.start();
size_t allocationCount = 0;
size_t totalTasksLaunched = 0;
Resources clusterAllocation;
while (totalTasksLaunched < totalTasksToLaunch &&
totalTime.elapsed() < TEST_TIMEOUT) {
Stopwatch watch;
watch.start();
// Advance the clock and trigger a batch allocation cycle.
Clock::advance(config.allocationInterval);
Clock::settle();
watch.stop();
allocationCount++;
Future<OfferedResources> offer_ = offers.get();
size_t offerCount = 0;
while (offer_.isReady()) {
const OfferedResources& offer = offer_.get();
const FrameworkID& frameworkId = offer.frameworkId;
const FrameworkProfile& frameworkProfile =
getFrameworkProfile(frameworkId);
offerCount++;
Resources remainingResources = offer.resources;
// We strip allocation information of `remainingResources` so that we
// can compare/subtract with `frameworkProfile.taskResources`.
remainingResources.unallocate();
size_t tasksLaunched = 0;
while (remainingResources.contains(frameworkProfile.taskResources) &&
frameworkTasksLaunched[frameworkId] <
frameworkProfile.maxTasksPerInstance &&
tasksLaunched < frameworkProfile.maxTasksPerOffer) {
remainingResources -= frameworkProfile.taskResources;
frameworkTasksLaunched[frameworkId]++;
totalTasksLaunched++;
tasksLaunched++;
clusterAllocation += frameworkProfile.taskResources;
}
// We restore the allocation information to recover the resources.
remainingResources.allocate(offer.role);
allocator->recoverResources(
frameworkId,
offer.slaveId,
remainingResources,
None());
offer_ = offers.get();
}
cout << "Launched " << totalTasksLaunched << " tasks out of "
<< totalTasksToLaunch << " total tasks in "
<< allocationCount << " rounds. Current allocation round generated "
<< offerCount << " offers and took " << watch.elapsed() << endl;
}
if (totalTasksLaunched < totalTasksToLaunch) {
cout << "Failed to launch all tasks: Timed out after "
<< TEST_TIMEOUT << endl;
}
// Compute and print statistics around cluster capacity, cluster allocation,
// and allocation target.
Resources clusterCapacity;
for (const AgentProfile& profile : config.agentProfiles) {
for (size_t i = 0; i < profile.instances; i++) {
clusterCapacity += profile.resources;
}
}
Resources targetAllocation;
for (const FrameworkProfile& profile : config.frameworkProfiles) {
for (size_t i = 0; i < profile.instances; i++) {
for (size_t j = 0; j < profile.maxTasksPerInstance; j++) {
targetAllocation += profile.taskResources;
}
}
}
cout << "Resource statistics:\n";
cout << "Cluster capacity: " << clusterCapacity << endl;
cout << "Cluster allocation: " << clusterAllocation << endl;
cout << "Target allocation: " << targetAllocation << endl;
}
struct QuotaParam
{
QuotaParam(
const size_t _smallQuotaRoleCount,
const size_t _largeQuotaRoleCount,
const size_t _frameworksPerRole)
: smallQuotaRoleCount(_smallQuotaRoleCount),
largeQuotaRoleCount(_largeQuotaRoleCount),
frameworksPerRole(_frameworksPerRole) {}
// `smallQuotaRole` will have a resource quota of 1/5 an agent, this will
// lead to agent chopping.
//
// `largeQuotaRole` will have a quota of 5 agents, this will lead to
// role unsatisfied quota chopping.
//
// Together, this determines the number of agents to be:
//
// `smallQuotaRoleCount / 5 + largeQuotaRoleCount * 5`
//
// TODO(mzhu): Consider adding another parameter to control the chopping.
const size_t smallQuotaRoleCount;
const size_t largeQuotaRoleCount;
// Number of frameworks per role.
const size_t frameworksPerRole;
};
class BENCHMARK_HierarchicalAllocator_WithQuotaParam
: public HierarchicalAllocations_BenchmarkBase,
public WithParamInterface<QuotaParam> {};
// Since the quota performance is mostly decided by number of agents and
// roles, in the default setting below, we let each role has a single framework.
INSTANTIATE_TEST_CASE_P(
QuotaParam,
BENCHMARK_HierarchicalAllocator_WithQuotaParam,
::testing::Values(
QuotaParam(25U, 5U, 1U), // 30 agents in total
QuotaParam(250U, 50U, 1U), // 300 agents in total
QuotaParam(2500U, 500U, 1U))); // 3000 agents in total
// This benchmark evaluates the allocator performance in the presence of
// roles with both small quota (which can be satisfied by half an agent) as
// well as large quota (which need resources from two agents). We setup the
// cluster, trigger one allocation cycle and measure the elapsed time. All
// cluster resources will be offered in this cycle to satisfy all roles' quota.
TEST_P(BENCHMARK_HierarchicalAllocator_WithQuotaParam, LargeAndSmallQuota)
{
// Pause the clock because we want to manually drive the allocations.
Clock::pause();
// Each agent can satisfy either two `smallQuotaRole` or half of
// `largeQuotaRole`.
const string agentResourcesString = "cpus:2;mem:2048;disk:2048";
const string smallQuotaResourcesString = "cpus:1;mem:1024;disk:1024";
const string largeQuotaResourcesString = "cpus:4;mem:4096;disk:4096";
BenchmarkConfig config;
// Store roles for setting up quota later (after allocator is initialized).
vector<string> smallQuotaRoles;
// Add framework profiles for `smallQuotaRole`.
for (size_t i = 0; i < GetParam().smallQuotaRoleCount; i++) {
string role("smallQuotaRole" + stringify(i));
smallQuotaRoles.push_back(role);
config.frameworkProfiles.push_back(FrameworkProfile(
"framework_small_" + stringify(i),
{role},
GetParam().frameworksPerRole));
}
// Store roles for setting up quota later (after allocator is initialized).
vector<string> largeQuotaRoles;
// Add framework profiles for `largeQuotaRole`.
for (size_t i = 0; i < GetParam().largeQuotaRoleCount; i++) {
string role("largeQuotaRole" + stringify(i));
largeQuotaRoles.push_back(role);
config.frameworkProfiles.push_back(FrameworkProfile(
"framework_large_" + stringify(i),
{role},
GetParam().frameworksPerRole));
}
// See comment above in `QuotaParam` regarding how we decide agent counts.
size_t agentCount =
GetParam().smallQuotaRoleCount / 5 + GetParam().largeQuotaRoleCount * 5;
// Add agent profiles.
config.agentProfiles.push_back(AgentProfile(
"agent",
agentCount,
CHECK_NOTERROR(Resources::parse(agentResourcesString))));
initializeCluster(config);
size_t totalRoleCount =
GetParam().smallQuotaRoleCount + GetParam().largeQuotaRoleCount;
cout << "Benchmark setup: " << agentCount << " agents, " << totalRoleCount
<< " roles, " << totalRoleCount * GetParam().frameworksPerRole
<< " frameworks" << endl;
// Pause the allocator here to prevent any event-driven allocations while
// setting up the quota (while setting quota currently does not lead to
// event-driven allocations, this behavior might change in the future).
allocator->pause();
foreach (const string& role, smallQuotaRoles) {
allocator->setQuota(role, createQuota(role, smallQuotaResourcesString));
}
foreach (const string& role, largeQuotaRoles) {
allocator->setQuota(role, createQuota(role, largeQuotaResourcesString));
}
allocator->resume();
cout << "Start allocation" << endl;
Stopwatch watch;
watch.start();
// Advance the clock and trigger a batch allocation cycle.
Clock::advance(config.allocationInterval);
Clock::settle();
watch.stop();
size_t offerCount = 0;
while (offers.get().isReady()) {
++offerCount;
}
cout << "Made " << offerCount << " allocations in " << watch.elapsed()
<< endl;
watch.start();
// Advance the clock and trigger a batch allocation cycle.
Clock::advance(config.allocationInterval);
Clock::settle();
watch.stop();
// No allocations should be made because all resources were allocated in the
// first round.
EXPECT_TRUE(offers.get().isPending());
cout << "Made 0 allocation in " << watch.elapsed() << endl;
}
struct NonQuotaVsQuotaParam
{
NonQuotaVsQuotaParam(
const size_t _roleCount,
const size_t _agentsPerRole,
const size_t _frameworksPerRole,
const bool _setQuota)
: roleCount(_roleCount),
agentsPerRole(_agentsPerRole),
frameworksPerRole(_frameworksPerRole),
setQuota(_setQuota) {}
const size_t roleCount;
// This determines the number of agents needed to satisfy a role's quota.
// And total number of agents in the cluster will be
// roleCount * agentsPerRole.
const size_t agentsPerRole;
const size_t frameworksPerRole;
const bool setQuota;
};
class BENCHMARK_HierarchicalAllocator_WithNonQuotaVsQuotaParam
: public HierarchicalAllocations_BenchmarkBase,
public WithParamInterface<NonQuotaVsQuotaParam> {};
INSTANTIATE_TEST_CASE_P(
NonQuotaVsQuotaParam,
BENCHMARK_HierarchicalAllocator_WithNonQuotaVsQuotaParam,
::testing::Values(
// 10 roles, 10*2 = 20 agents, 10*2 = 20 frameworks,
// without and with quota.
NonQuotaVsQuotaParam(10U, 2U, 2U, false),
NonQuotaVsQuotaParam(10U, 2U, 2U, true),
// 100 roles, 100*2 = 200 agents, 100*2 = 200 frameworks.
// without and with quota.
NonQuotaVsQuotaParam(100U, 2U, 2U, false),
NonQuotaVsQuotaParam(100U, 2U, 2U, true),
// 1000 roles, 1000*2 = 2000 agents, 1000*2 = 2000 frameworks.
// without and with quota.
NonQuotaVsQuotaParam(1000U, 2U, 2U, false),
NonQuotaVsQuotaParam(1000U, 2U, 2U, true)));
// This benchmark evaluates the performance difference between nonquota
// and quota settings. In both settings, the same allocations are made
// for fair comparison. In particular, since the agent will always be
// allocated as a whole in nonquota settings, we should also avoid
// agent chopping in quota setting as well. Thus in this benchmark,
// quotas are only set to be multiples of whole agent resources.
// This is also why we have this dedicated benchmark for comparison
// rather than extending the existing quota benchmarks (which involves
// agent chopping).
TEST_P(
BENCHMARK_HierarchicalAllocator_WithNonQuotaVsQuotaParam, NonQuotaVsQuota)
{
// Pause the clock because we want to manually drive the allocations.
Clock::pause();
const string agentResourcesString = "cpus:2;mem:2048;disk:2048";
BenchmarkConfig config;
// Store roles for setting up quota later if needed.
vector<string> roles;
for (size_t i = 0; i < GetParam().roleCount; i++) {
string role("role" + stringify(i));
roles.push_back(role);
config.frameworkProfiles.push_back(FrameworkProfile(
"framework_" + stringify(i), {role}, GetParam().frameworksPerRole));
}
size_t agentCount = GetParam().roleCount * GetParam().agentsPerRole;
// Add agent profiles.
config.agentProfiles.push_back(AgentProfile(
"agent",
agentCount,
CHECK_NOTERROR(Resources::parse(agentResourcesString))));
initializeCluster(config);
if (GetParam().setQuota) {
// We ensure the same allocations are made for both nonquota and quota
// settings for fair comparison. Thus quota is set to consume multiple
// agents and each agent will be offered as a whole.
const string quotaResourcesString =
"cpus:" + stringify(2 * GetParam().agentsPerRole) +
";mem:" + stringify(2048 * GetParam().agentsPerRole) +
";disk:" + stringify(2048 * GetParam().agentsPerRole);
// Pause the allocator here to prevent any event-driven allocations while
// setting up the quota (while setting quota currently does not lead to
// event-driven allocations, this behavior might change in the future).
allocator->pause();
foreach (const string& role, roles) {
allocator->setQuota(role, createQuota(role, quotaResourcesString));
}
allocator->resume();
cout << "Quota run setup: ";
} else {
cout << "Nonquota run setup: ";
}
cout << agentCount << " agents, " << GetParam().roleCount << " roles, "
<< GetParam().roleCount * GetParam().frameworksPerRole << " frameworks"
<< endl;
Stopwatch watch;
watch.start();
// Advance the clock and trigger a batch allocation cycle.
Clock::advance(config.allocationInterval);
Clock::settle();
watch.stop();
size_t offerCount = 0;
while (offers.get().isReady()) {
offerCount++;
}
cout << "Made " << offerCount << " allocations in " << watch.elapsed()
<< endl;
watch.start();
// Advance the clock and trigger a batch allocation cycle.
Clock::advance(config.allocationInterval);
Clock::settle();
watch.stop();
// No allocations should be made because all resources were allocated in the
// first round.
EXPECT_TRUE(offers.get().isPending());
cout << "Made 0 allocation in " << watch.elapsed() << endl;
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {