blob: cd84cd24d3587fafc01ae1861f22c47262f2d7e9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <iostream>
#include <limits>
#include <map>
#include <string>
#include <vector>
#include <gmock/gmock.h>
#include <mesos/executor.hpp>
#include <mesos/scheduler.hpp>
#include <mesos/allocator/allocator.hpp>
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <stout/some.hpp>
#include <stout/strings.hpp>
#include "master/constants.hpp"
#include "master/flags.hpp"
#include "master/master.hpp"
#include "master/allocator/mesos/hierarchical.hpp"
#include "master/detector/standalone.hpp"
#include "tests/allocator.hpp"
#include "tests/containerizer.hpp"
#include "tests/mesos.hpp"
#include "tests/mock_slave.hpp"
#include "tests/resources_utils.hpp"
using mesos::internal::master::allocator::HierarchicalDRFAllocator;
using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
using mesos::master::detector::MasterDetector;
using mesos::master::detector::StandaloneMasterDetector;
using process::Clock;
using process::Future;
using process::Owned;
using process::PID;
using std::map;
using std::string;
using std::vector;
using testing::_;
using testing::AtMost;
using testing::DoAll;
using testing::DoDefault;
namespace mesos {
namespace internal {
namespace tests {
class ReservationTest : public MesosTest
{
public:
// Depending on the agent capability, the master will send different
// messages to the agent when a reservation is applied.
template <typename To>
Future<Resources> getOperationMessage(To to)
{
return FUTURE_PROTOBUF(ApplyOperationMessage(), _, to)
.then([](const ApplyOperationMessage& message) {
switch (message.operation_info().type()) {
case Offer::Operation::UNKNOWN:
case Offer::Operation::LAUNCH:
case Offer::Operation::LAUNCH_GROUP:
case Offer::Operation::CREATE_DISK:
case Offer::Operation::DESTROY_DISK:
case Offer::Operation::GROW_VOLUME:
case Offer::Operation::SHRINK_VOLUME:
UNREACHABLE();
case Offer::Operation::RESERVE: {
Resources resources =
message.operation_info().reserve().resources();
resources.unallocate();
return resources;
}
case Offer::Operation::UNRESERVE: {
Resources resources =
message.operation_info().unreserve().resources();
resources.unallocate();
return resources;
}
case Offer::Operation::CREATE: {
Resources resources = message.operation_info().create().volumes();
resources.unallocate();
return resources;
}
case Offer::Operation::DESTROY: {
Resources resources = message.operation_info().destroy().volumes();
resources.unallocate();
return resources;
}
}
UNREACHABLE();
});
}
};
// This tests that a framework can send back a Reserve operation
// as a response to an offer, which updates the resources in the
// allocator and results in the reserved resources being reoffered to
// the framework. The framework then sends back an Unreserved offer
// operation to unreserve the reserved resources. Finally, We test
// that the framework receives the unreserved resources.
TEST_F(ReservationTest, ReserveThenUnreserve)
{
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role");
master::Flags masterFlags = CreateMasterFlags();
masterFlags.allocation_interval = Milliseconds(5);
masterFlags.roles = frameworkInfo.roles(0);
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.resources = "cpus:1;mem:512";
Future<UpdateSlaveMessage> updateSlaveMessage =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(updateSlaveMessage);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
// We use the filter explicitly here so that the resources will not
// be filtered for 5 seconds (the default).
Filters filters;
filters.set_refuse_seconds(0);
Resources unreserved = Resources::parse("cpus:1;mem:512").get();
Resources dynamicallyReserved =
unreserved.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
// We use this to capture offers from 'resourceOffers'.
Future<vector<Offer>> offers;
EXPECT_CALL(sched, registered(&driver, _, _));
// The expectation for the first offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
// In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
// The expectation for the next offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Reserve the resources.
driver.acceptOffers({offer.id()}, {RESERVE(dynamicallyReserved)}, filters);
// In the next offer, expect an offer with reserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(dynamicallyReserved, frameworkInfo.roles(0))));
// The expectation for the next offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Unreserve the resources.
driver.acceptOffers({offer.id()}, {UNRESERVE(dynamicallyReserved)}, filters);
// In the next offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
driver.stop();
driver.join();
}
// This tests for failures arising from floating point precision errors during
// processing of resource reservation requests. The test first asks a framework
// to send a resource reservation request in response to an offer, which updates
// the resources in the allocator and results in resources being re-offered to
// the framework. The framework then sends back a new resource reservation
// request which involves a floating point value for the resources being
// reserved, which in turn triggers a problematic floating point comparison.
TEST_F(ReservationTest, ReserveTwiceWithDoubleValue)
{
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role");
master::Flags masterFlags = CreateMasterFlags();
masterFlags.allocation_interval = Milliseconds(5);
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.resources = "cpus:24;mem:4096";
Future<UpdateSlaveMessage> updateSlaveMessage =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(updateSlaveMessage);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
// We use the filter explicitly here so that the resources will not
// be filtered (default would be 5 seconds).
Filters filters;
filters.set_refuse_seconds(0);
Resources unreserved = Resources::parse("cpus:0.1;mem:512").get();
Resources dynamicallyReserved =
unreserved.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
EXPECT_CALL(sched, registered(&driver, _, _));
driver.start();
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
// In the first offer, expect an offer with unreserved resources.
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
// The expectation for the next offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// First iteration: Reserving 0.1 CPU.
driver.acceptOffers({offer.id()}, {RESERVE(dynamicallyReserved)}, filters);
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
// In the second offer, expect an offer with reserved resources.
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(dynamicallyReserved, frameworkInfo.roles(0))));
// The expectation for the next offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Second iteration: Reserving second 0.1 CPU.
driver.acceptOffers({offer.id()}, {RESERVE(dynamicallyReserved)}, filters);
// The agent should be able to calculate the remaining resources
// correctly at this point. If the floating point comparison on the
// agent isn't correct it will end up failing `CHECKS` on the agent,
// potentially crashing the agent. See MESOS-3552.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
Resources reserved = Resources::parse("cpus:0.2;mem:512").get();
Resources finalReservation =
reserved.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(finalReservation, frameworkInfo.roles(0))));
driver.stop();
driver.join();
}
// This tests that a framework can send back a Reserve followed by a
// LaunchTasks operation as a response to an offer, which
// updates the resources in the allocator then proceeds to launch the
// task with the reserved resources. The reserved resources are
// reoffered to the framework on task completion. The framework then
// sends back an Unreserved operation to unreserve the reserved
// resources. We test that the framework receives the unreserved
// resources.
TEST_F(ReservationTest, ReserveAndLaunchThenUnreserve)
{
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role");
master::Flags masterFlags = CreateMasterFlags();
masterFlags.allocation_interval = Milliseconds(5);
masterFlags.roles = frameworkInfo.roles(0);
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.resources = "cpus:1;mem:512";
Future<UpdateSlaveMessage> updateSlaveMessage =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &containerizer, slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(updateSlaveMessage);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Resources unreserved = Resources::parse("cpus:1;mem:512").get();
Resources dynamicallyReserved =
unreserved.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
// We use this to capture offers from 'resourceOffers'.
Future<vector<Offer>> offers;
EXPECT_CALL(sched, registered(&driver, _, _));
// The expectation for the first offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
// In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
// Create a task.
TaskInfo taskInfo =
createTask(offer.slave_id(), dynamicallyReserved, "exit 1", exec.id);
EXPECT_CALL(exec, registered(_, _, _, _));
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(DoDefault());
// The expectation for the next offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Reserve the resources and launch the tasks.
driver.acceptOffers(
{offer.id()},
{RESERVE(dynamicallyReserved),
LAUNCH({taskInfo})});
// In the next offer, expect an offer with reserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(dynamicallyReserved, frameworkInfo.roles(0))));
// The expectation for the next offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// We use the filter explicitly here so that the resources will not
// be filtered for 5 seconds (the default).
Filters filters;
filters.set_refuse_seconds(0);
// Unreserve the resources.
driver.acceptOffers({offer.id()}, {UNRESERVE(dynamicallyReserved)}, filters);
// In the next offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
// This test launches 2 frameworks in the same role. framework1
// reserves resources by sending back a Reserve operation. We
// first test that framework1 receives the reserved resources, then on
// the next resource offer, framework1 declines the offer. This
// should lead to framework2 receiving the resources that framework1
// reserved.
TEST_F(ReservationTest, ReserveShareWithinRole)
{
string role = "role";
FrameworkInfo frameworkInfo1 = DEFAULT_FRAMEWORK_INFO;
frameworkInfo1.set_name("framework1");
frameworkInfo1.set_roles(0, role);
FrameworkInfo frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
frameworkInfo1.set_name("framework2");
frameworkInfo2.set_roles(0, role);
master::Flags masterFlags = CreateMasterFlags();
masterFlags.allocation_interval = Milliseconds(5);
masterFlags.roles = role;
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.resources = "cpus:1;mem:512";
Future<UpdateSlaveMessage> updateSlaveMessage =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(updateSlaveMessage);
MockScheduler sched1;
MesosSchedulerDriver driver1(
&sched1, frameworkInfo1, master.get()->pid, DEFAULT_CREDENTIAL);
MockScheduler sched2;
MesosSchedulerDriver driver2(
&sched2, frameworkInfo2, master.get()->pid, DEFAULT_CREDENTIAL);
Resources unreserved = Resources::parse("cpus:1;mem:512").get();
Resources dynamicallyReserved = unreserved.pushReservation(
createDynamicReservationInfo(role, frameworkInfo1.principal()));
// We use this to capture offers from 'resourceOffers'.
Future<vector<Offer>> offers;
EXPECT_CALL(sched1, registered(&driver1, _, _));
// The expectation for the first offer.
EXPECT_CALL(sched1, resourceOffers(&driver1, _))
.WillOnce(FutureArg<1>(&offers));
driver1.start();
// In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo1.roles(0))));
// The expectation for the next offer.
EXPECT_CALL(sched1, resourceOffers(&driver1, _))
.WillOnce(FutureArg<1>(&offers));
// We use the filter explicitly here so that the resources will not
// be filtered for 5 seconds (the default).
Filters filters;
filters.set_refuse_seconds(0);
// Reserve the resources.
driver1.acceptOffers({offer.id()}, {RESERVE(dynamicallyReserved)}, filters);
// In the next offer, expect an offer with reserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(dynamicallyReserved, frameworkInfo1.roles(0))));
// The filter to decline the offer "forever".
Filters filtersForever;
filtersForever.set_refuse_seconds(std::numeric_limits<double>::max());
// Decline the offer "forever" in order to force framework2 to
// receive the resources.
driver1.declineOffer(offer.id(), filtersForever);
EXPECT_CALL(sched2, registered(&driver2, _, _));
// The expectation for the next offer.
EXPECT_CALL(sched2, resourceOffers(&driver2, _))
.WillOnce(FutureArg<1>(&offers));
driver2.start();
// In the next offer, expect an offer with the resources reserved by
// framework1.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(dynamicallyReserved, frameworkInfo1.roles(0))));
driver1.stop();
driver1.join();
driver2.stop();
driver2.join();
}
// This tests that a Reserve operation where the specified resources
// does not exist in the given offer (too large, in this case) is
// dropped.
TEST_F(ReservationTest, DropReserveTooLarge)
{
TestAllocator<> allocator;
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role");
master::Flags masterFlags = CreateMasterFlags();
masterFlags.allocation_interval = Milliseconds(5);
masterFlags.roles = frameworkInfo.roles(0);
EXPECT_CALL(allocator, initialize(_, _, _));
Try<Owned<cluster::Master>> master = StartMaster(&allocator, masterFlags);
ASSERT_SOME(master);
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.resources = "cpus:1;mem:512";
EXPECT_CALL(allocator, addSlave(_, _, _, _, _, _));
Future<UpdateSlaveMessage> updateSlaveMessage =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(updateSlaveMessage);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Resources unreserved = Resources::parse("cpus:1;mem:512").get();
Resources unreservedTooLarge = Resources::parse("cpus:1;mem:1024").get();
Resources dynamicallyReservedTooLarge =
unreservedTooLarge.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
// We use this to capture the offers from 'resourceOffers'.
Future<vector<Offer>> offers;
EXPECT_CALL(sched, registered(&driver, _, _));
// The expectation for the first offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
EXPECT_CALL(allocator, addFramework(_, _, _, _, _));
driver.start();
// In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
// The expectation for the next offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Expect that the reserve operation will be dropped.
EXPECT_CALL(allocator, updateAllocation(_, _, _, _))
.Times(0);
// We use the filter explicitly here so that the resources will not
// be filtered for 5 seconds (the default).
Filters filters;
filters.set_refuse_seconds(0);
// Attempt to reserve resources that are too large for the offer.
driver.acceptOffers(
{offer.id()},
{RESERVE(dynamicallyReservedTooLarge)},
filters);
// In the next offer, still expect an offer with the unreserved
// resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
driver.stop();
driver.join();
}
// This test verifies that the slave checkpoints the resources for
// dynamic reservations to the disk, recovers them upon restart, and
// sends them to the master during re-registration.
TEST_F(ReservationTest, ResourcesCheckpointing)
{
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role");
master::Flags masterFlags = CreateMasterFlags();
masterFlags.allocation_interval = Milliseconds(5);
masterFlags.roles = frameworkInfo.roles(0);
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.recover = "reconnect";
slaveFlags.resources = "cpus:8;mem:4096";
Future<UpdateSlaveMessage> updateSlaveMessage =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(updateSlaveMessage);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Resources unreserved = Resources::parse("cpus:8;mem:2048").get();
Resources reserved = unreserved.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
// We use this to capture offers from 'resourceOffers'.
Future<vector<Offer>> offers;
EXPECT_CALL(sched, registered(&driver, _, _));
// The expectation for the offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
// Expect an offer with the unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
Future<Resources> message = getOperationMessage(slave.get()->pid);
// We use the filter explicitly here so that the resources
// will not be filtered for 5 seconds (the default).
Filters filters;
filters.set_refuse_seconds(0);
// Reserve the resources.
driver.acceptOffers({offer.id()}, {RESERVE(reserved)}, filters);
// Expect to receive the operation message.
AWAIT_READY(message);
// Restart the slave without shutting down.
slave.get()->terminate();
Future<ReregisterSlaveMessage> reregisterSlave =
FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _);
Future<Nothing> slaveRecover = FUTURE_DISPATCH(_, &Slave::recover);
slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
// Expect the slave to recover.
AWAIT_READY(slaveRecover);
// Expect to receive the 'ReregisterSlaveMessage' containing the
// reserved resources.
AWAIT_READY(reregisterSlave);
ReregisterSlaveMessage reregisterSlave_ = reregisterSlave.get();
upgradeResources(&reregisterSlave_);
EXPECT_EQ(reregisterSlave_.checkpointed_resources(), reserved);
driver.stop();
driver.join();
}
// This test verifies the case where a slave that has checkpointed
// dynamic reservations reregisters with a failed over master, and the
// dynamic reservations are later correctly offered to the framework.
TEST_F(ReservationTest, MasterFailover)
{
// Pause the cock and control it manually in order to
// control the timing of the offer cycle.
Clock::pause();
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role");
master::Flags masterFlags = CreateMasterFlags();
masterFlags.allocation_interval = Milliseconds(5);
masterFlags.roles = frameworkInfo.roles(0);
Try<Owned<cluster::Master>> master1 = StartMaster(masterFlags);
ASSERT_SOME(master1);
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.resources = "cpus:8;mem:2048";
Future<UpdateSlaveMessage> updateSlaveMessage =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
StandaloneMasterDetector detector(master1.get()->pid);
Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
ASSERT_SOME(slave);
Clock::advance(slaveFlags.registration_backoff_factor);
Clock::settle();
AWAIT_READY(updateSlaveMessage);
MockScheduler sched;
TestingMesosSchedulerDriver driver(&sched, &detector, frameworkInfo);
Resources unreserved = Resources::parse("cpus:8;mem:2048").get();
Resources reserved = unreserved.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
// We use this to capture offers from 'resourceOffers'.
Future<vector<Offer>> offers;
EXPECT_CALL(sched, registered(&driver, _, _));
// The expectation for the first offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
// Advance the clock to generate an offer.
Clock::advance(masterFlags.allocation_interval);
Clock::settle();
// In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
Future<Resources> message = getOperationMessage(slave.get()->pid);
// We use the filter explicitly here so that the resources
// will not be filtered for 5 seconds (the default).
Filters filters;
filters.set_refuse_seconds(0);
// Reserve the resources.
driver.acceptOffers({offer.id()}, {RESERVE(reserved)}, filters);
// Expect to receive the operation message.
AWAIT_READY(message);
// This is to make sure operation message is processed.
Clock::settle();
EXPECT_CALL(sched, disconnected(&driver));
// Simulate master failover by restarting the master.
master1->reset();
Try<Owned<cluster::Master>> master2 = StartMaster(masterFlags);
ASSERT_SOME(master2);
EXPECT_CALL(sched, registered(&driver, _, _));
// The expectation for the next offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
// Simulate a new master detected event on the slave so that the
// slave will do a re-registration.
detector.appoint(master2.get()->pid);
// Ensure agent registration is processed.
Clock::advance(slaveFlags.authentication_backoff_factor);
Clock::advance(slaveFlags.registration_backoff_factor);
Clock::settle();
Clock::resume();
// In the next offer, expect an offer with the reserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(reserved, frameworkInfo.roles(0))));
driver.stop();
driver.join();
}
// NOTE: The following tests covers the notion of compatible resources
// on slave restart. Newly declared resources are compatible if they
// include the checkpointed resources. For example, suppose a slave
// initially declares "cpus:8;mem:4096", and "cpus:8;mem:2048" gets
// reserved and thus checkpointed. In order to be compatible, the
// newly declared resources must include "cpus:8;mem:2048". For
// example, "cpus:12;mem:2048" would be considered compatible.
// This test verifies that a slave can restart as long as the
// checkpointed resources it recovers are compatible with the slave
// resources specified using the '--resources' flag.
TEST_F(ReservationTest, CompatibleCheckpointedResources)
{
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role");
master::Flags masterFlags = CreateMasterFlags();
masterFlags.allocation_interval = Milliseconds(5);
masterFlags.roles = frameworkInfo.roles(0);
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.resources = "cpus:8;mem:4096";
Future<UpdateSlaveMessage> updateSlaveMessage =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
StandaloneMasterDetector detector(master.get()->pid);
Try<Owned<cluster::Slave>> slave1 = StartSlave(
&detector,
&containerizer,
slaveFlags,
true);
ASSERT_SOME(slave1);
ASSERT_NE(nullptr, slave1.get()->mock());
slave1.get()->start();
AWAIT_READY(updateSlaveMessage);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Resources unreserved = Resources::parse("cpus:8;mem:2048").get();
Resources reserved = unreserved.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
// We use this to capture offers from 'resourceOffers'.
Future<vector<Offer>> offers;
EXPECT_CALL(sched, registered(&driver, _, _));
// The expectation for the first offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
// In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
Future<Resources> message = getOperationMessage(_);
// We use the filter explicitly here so that the resources
// will not be filtered for 5 seconds (the default).
Filters filters;
filters.set_refuse_seconds(0);
// Reserve the resources.
driver.acceptOffers({offer.id()}, {RESERVE(reserved)}, filters);
// Wait until the operation message arrives.
AWAIT_READY(message);
slave1.get()->terminate();
// Simulate a reboot of the slave machine by modifying the boot ID.
ASSERT_SOME(os::write(slave::paths::getBootIdPath(
slave::paths::getMetaRootDir(slaveFlags.work_dir)),
"rebooted! ;)"));
// Change the slave resources so that it is compatible with the
// checkpointed resources.
slaveFlags.resources = "cpus:12;mem:2048";
Try<Owned<cluster::Slave>> slave2 = StartSlave(
&detector,
&containerizer,
slaveFlags,
true);
ASSERT_SOME(slave2);
ASSERT_NE(nullptr, slave2.get()->mock());
Future<Future<Nothing>> recover;
EXPECT_CALL(*slave2.get()->mock(), __recover(_))
.WillOnce(DoAll(FutureArg<0>(&recover), Return()));
slave2.get()->start();
// Wait for 'recover' to finish.
AWAIT_READY(recover);
// Expect 'recover' to have completed successfully.
AWAIT_READY(recover.get());
slave2.get()->terminate();
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
// This test verifies that a slave can restart as long as the
// checkpointed resources (including persistent volumes) it recovers
// are compatible with the slave resources specified using the
// '--resources' flag.
TEST_F(ReservationTest, CompatibleCheckpointedResourcesWithPersistentVolumes)
{
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role");
master::Flags masterFlags = CreateMasterFlags();
masterFlags.allocation_interval = Milliseconds(5);
masterFlags.roles = frameworkInfo.roles(0);
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.resources = "cpus:8;mem:4096;disk:2048";
Future<UpdateSlaveMessage> updateSlaveMessage =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
StandaloneMasterDetector detector(master.get()->pid);
Try<Owned<cluster::Slave>> slave1 = StartSlave(
&detector,
&containerizer,
slaveFlags,
true);
ASSERT_SOME(slave1);
ASSERT_NE(nullptr, slave1.get()->mock());
slave1.get()->start();
AWAIT_READY(updateSlaveMessage);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Resources unreserved = Resources::parse("cpus:8;mem:2048").get();
Resources reserved = unreserved.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
Resource unreservedDisk = Resources::parse("disk", "1024", "*").get();
Resource reservedDisk = unreservedDisk;
reservedDisk.add_reservations()->CopyFrom(createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
Resource volume = reservedDisk;
volume.mutable_disk()->CopyFrom(createDiskInfo(
"persistence_id",
"container_path",
None(),
None(),
None(),
DEFAULT_CREDENTIAL.principal()));
// We use this to capture offers from 'resourceOffers'.
Future<vector<Offer>> offers;
EXPECT_CALL(sched, registered(&driver, _, _));
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved + unreservedDisk, frameworkInfo.roles(0))));
Future<Resources> message2 = getOperationMessage(_);
Future<Resources> message1 = getOperationMessage(_);
// We use the filter explicitly here so that the resources
// will not be filtered for 5 seconds (the default).
Filters filters;
filters.set_refuse_seconds(0);
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
// Reserve the resources and create the volume.
driver.acceptOffers(
{offer.id()},
{RESERVE(reserved + reservedDisk), CREATE(volume)},
filters);
// NOTE: Currently, we send one message per operation. But this is
// an implementation detail which is subject to change.
AWAIT_READY(message1);
EXPECT_TRUE(message1->contains(reserved + reservedDisk));
AWAIT_READY(message2);
EXPECT_TRUE(message2->contains(volume));
// Expect an offer containing the volume.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(reserved + volume, frameworkInfo.roles(0))));
Future<OfferID> rescindedOfferId;
EXPECT_CALL(sched, offerRescinded(&driver, _))
.WillOnce(FutureArg<1>(&rescindedOfferId));
slave1.get()->terminate();
AWAIT_READY(rescindedOfferId);
EXPECT_EQ(rescindedOfferId.get(), offer.id());
// Simulate a reboot of the slave machine by modifying the boot ID.
ASSERT_SOME(os::write(slave::paths::getBootIdPath(
slave::paths::getMetaRootDir(slaveFlags.work_dir)),
"rebooted! ;)"));
// Change the slave resources so that it is compatible with the
// checkpointed resources.
slaveFlags.resources = "cpus:12;mem:2048;disk:1024";
Try<Owned<cluster::Slave>> slave2 = StartSlave(
&detector,
&containerizer,
slaveFlags,
true);
ASSERT_SOME(slave2);
ASSERT_NE(nullptr, slave2.get()->mock());
Future<Future<Nothing>> recover;
EXPECT_CALL(*slave2.get()->mock(), __recover(_))
.WillOnce(DoAll(FutureArg<0>(&recover), Return()));
slave2.get()->start();
// Wait for 'recover' to finish.
AWAIT_READY(recover);
// Expect that 'recover' will complete successfully.
AWAIT_READY(recover.get());
slave2.get()->terminate();
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
// This test verifies that a slave will refuse to start if the
// checkpointed resources it recovers are not compatible with the
// slave resources specified using the '--resources' flag.
TEST_F(ReservationTest, IncompatibleCheckpointedResources)
{
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role");
master::Flags masterFlags = CreateMasterFlags();
masterFlags.allocation_interval = Milliseconds(5);
masterFlags.roles = frameworkInfo.roles(0);
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.resources = "cpus:8;mem:4096";
Future<UpdateSlaveMessage> updateSlaveMessage =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
StandaloneMasterDetector detector(master.get()->pid);
Try<Owned<cluster::Slave>> slave1 = StartSlave(
&detector,
&containerizer,
slaveFlags,
true);
ASSERT_SOME(slave1);
ASSERT_NE(nullptr, slave1.get()->mock());
slave1.get()->start();
AWAIT_READY(updateSlaveMessage);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Resources unreserved = Resources::parse("cpus:8;mem:2048").get();
Resources reserved = unreserved.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
// We use this to capture offers from 'resourceOffers'.
Future<vector<Offer>> offers;
EXPECT_CALL(sched, registered(&driver, _, _));
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
// In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
Future<Resources> message = getOperationMessage(_);
// We use the filter explicitly here so that the resources
// will not be filtered for 5 seconds (the default).
Filters filters;
filters.set_refuse_seconds(0);
// Reserve the resources.
driver.acceptOffers({offer.id()}, {RESERVE(reserved)}, filters);
// Wait for the operation message to be delivered.
AWAIT_READY(message);
slave1.get()->terminate();
// Simulate a reboot of the slave machine by modifying the boot ID.
ASSERT_SOME(os::write(slave::paths::getBootIdPath(
slave::paths::getMetaRootDir(slaveFlags.work_dir)),
"rebooted! ;)"));
// Change the slave resources so that it's not compatible with the
// checkpointed resources.
slaveFlags.resources = "cpus:4;mem:2048";
Try<Owned<cluster::Slave>> slave2 = StartSlave(
&detector,
&containerizer,
slaveFlags,
true);
ASSERT_SOME(slave2);
ASSERT_NE(nullptr, slave2.get()->mock());
Future<Future<Nothing>> recover;
EXPECT_CALL(*slave2.get()->mock(), __recover(_))
.WillOnce(DoAll(FutureArg<0>(&recover), Return()));
slave2.get()->start();
// Wait for 'recover' to finish.
AWAIT_READY(recover);
// Expect for 'recover' to have failed.
AWAIT_FAILED(recover.get());
slave2.get()->terminate();
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
// This test verifies that reserve and unreserve operations complete
// successfully when authorization succeeds.
TEST_F(ReservationTest, GoodACLReserveThenUnreserve)
{
ACLs acls;
// The principal of `DEFAULT_CREDENTIAL` can reserve resources for any role.
mesos::ACL::ReserveResources* reserve = acls.add_reserve_resources();
reserve->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal());
reserve->mutable_roles()->set_type(mesos::ACL::Entity::ANY);
// The principal of `DEFAULT_CREDENTIAL` can unreserve
// its own reserved resources.
mesos::ACL::UnreserveResources* unreserve = acls.add_unreserve_resources();
unreserve->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal());
unreserve->mutable_reserver_principals()->add_values(
DEFAULT_CREDENTIAL.principal());
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role");
// Create a master.
master::Flags masterFlags = CreateMasterFlags();
masterFlags.acls = acls;
masterFlags.allocation_interval = Milliseconds(5);
masterFlags.roles = frameworkInfo.roles(0);
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
// Create a slave.
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.resources = "cpus:1;mem:512";
Future<UpdateSlaveMessage> updateSlaveMessage =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(updateSlaveMessage);
// Create a scheduler.
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
// We use the filter explicitly here so that the resources will not
// be filtered for 5 seconds (the default).
Filters filters;
filters.set_refuse_seconds(0);
Resources unreserved = Resources::parse("cpus:1;mem:512").get();
Resources dynamicallyReserved =
unreserved.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
// We use this to capture offers from 'resourceOffers'.
Future<vector<Offer>> offers;
// The expectation for the first offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
EXPECT_CALL(sched, registered(&driver, _, _));
driver.start();
// In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
// The expectation for the next offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Reserve the resources.
driver.acceptOffers({offer.id()}, {RESERVE(dynamicallyReserved)}, filters);
// In the next offer, expect an offer with reserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(dynamicallyReserved, frameworkInfo.roles(0))));
// The expectation for the next offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Unreserve the resources.
driver.acceptOffers({offer.id()}, {UNRESERVE(dynamicallyReserved)}, filters);
// In the next offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
driver.stop();
driver.join();
}
// This test verifies that a reserve operation
// gets dropped if authorization fails.
TEST_F(ReservationTest, BadACLDropReserve)
{
ACLs acls;
// No entity can reserve any resources.
mesos::ACL::ReserveResources* reserve = acls.add_reserve_resources();
reserve->mutable_principals()->set_type(mesos::ACL::Entity::NONE);
reserve->mutable_roles()->set_type(mesos::ACL::Entity::ANY);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role");
// Create a master.
master::Flags masterFlags = CreateMasterFlags();
masterFlags.acls = acls;
masterFlags.allocation_interval = Milliseconds(5);
masterFlags.roles = frameworkInfo.roles(0);
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
// Create a slave.
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.resources = "cpus:1;mem:512";
Future<UpdateSlaveMessage> updateSlaveMessage =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(updateSlaveMessage);
// Create a scheduler.
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
// We use the filter explicitly here so that the resources will not
// be filtered for 5 seconds (the default).
Filters filters;
filters.set_refuse_seconds(0);
Resources unreserved = Resources::parse("cpus:1;mem:512").get();
Resources dynamicallyReserved =
unreserved.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
// We use this to capture offers from 'resourceOffers'.
Future<vector<Offer>> offers;
// The expectation for the first offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
EXPECT_CALL(sched, registered(&driver, _, _));
driver.start();
// In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
// The expectation for the next offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Reserve the resources.
driver.acceptOffers({offer.id()}, {RESERVE(dynamicallyReserved)}, filters);
// In the next offer, still expect an offer with unreserved
// resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
driver.stop();
driver.join();
}
// This test verifies that an unreserve operation
// gets dropped if authorization fails.
TEST_F(ReservationTest, BadACLDropUnreserve)
{
ACLs acls;
// This principal can reserve resources for any role.
mesos::ACL::ReserveResources* reserve = acls.add_reserve_resources();
reserve->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal());
reserve->mutable_roles()->set_type(mesos::ACL::Entity::ANY);
// This principal cannot unreserve any resources.
mesos::ACL::UnreserveResources* unreserve = acls.add_unreserve_resources();
unreserve->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal());
unreserve->mutable_reserver_principals()->set_type(mesos::ACL::Entity::NONE);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role");
// Create a master.
master::Flags masterFlags = CreateMasterFlags();
masterFlags.acls = acls;
masterFlags.allocation_interval = Milliseconds(5);
masterFlags.roles = frameworkInfo.roles(0);
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
// Create a slave.
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.resources = "cpus:2;mem:1024";
Future<UpdateSlaveMessage> updateSlaveMessage =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(updateSlaveMessage);
// Create a scheduler.
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
// We use the filter explicitly here so that the resources will not
// be filtered for 5 seconds (the default).
Filters filters;
filters.set_refuse_seconds(0);
// Define the resources to be reserved.
Resources unreserved1 = Resources::parse("cpus:1;mem:512").get();
Resources dynamicallyReserved1 =
unreserved1.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
Resources unreserved2 = Resources::parse("cpus:0.5;mem:256").get();
Resources dynamicallyReserved2 =
unreserved2.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
// We use this to capture offers from 'resourceOffers'.
Future<vector<Offer>> offers;
// The expectation for the first offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
EXPECT_CALL(sched, registered(&driver, _, _));
driver.start();
// In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
// The slave's total resources are twice those defined by `unreserved1`.
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved1 + unreserved1, frameworkInfo.roles(0))));
// The expectation for the next offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Reserve the first set of resources.
driver.acceptOffers({offer.id()}, {RESERVE(dynamicallyReserved1)}, filters);
// In the next offer, expect an offer with reserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
// The reserved resources and an equal portion of
// unreserved resources should be present.
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(
dynamicallyReserved1 + unreserved1,
frameworkInfo.roles(0))));
// The expectation for the next offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Unreserve the first set of resources, and reserve the second set.
driver.acceptOffers({offer.id()},
{UNRESERVE(dynamicallyReserved1),
RESERVE(dynamicallyReserved2)},
filters);
// In the next offer, expect to find both sets of reserved
// resources, since the Unreserve operation should fail.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(
dynamicallyReserved1 + dynamicallyReserved2 + unreserved2,
frameworkInfo.roles(0))));
driver.stop();
driver.join();
}
// Tests a couple more complex combinations of `RESERVE`, `UNRESERVE`, and
// `LAUNCH` operations to verify that they work with authorization.
TEST_F(ReservationTest, ACLMultipleOperations)
{
// Pause the clock and control it manually in order to
// control the timing of the offer cycle.
Clock::pause();
ACLs acls;
// This principal can reserve resources for any role.
mesos::ACL::ReserveResources* reserve = acls.add_reserve_resources();
reserve->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal());
reserve->mutable_roles()->set_type(mesos::ACL::Entity::ANY);
// This principal cannot unreserve any resources.
mesos::ACL::UnreserveResources* unreserve = acls.add_unreserve_resources();
unreserve->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal());
unreserve->mutable_reserver_principals()->set_type(mesos::ACL::Entity::NONE);
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role");
// Create a master.
master::Flags masterFlags = CreateMasterFlags();
masterFlags.acls = acls;
masterFlags.allocation_interval = Milliseconds(5);
masterFlags.roles = frameworkInfo.roles(0);
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
MockExecutor exec(DEFAULT_EXECUTOR_ID);
TestContainerizer containerizer(&exec);
// Create a slave.
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.resources = "cpus:2;mem:1024";
Future<UpdateSlaveMessage> updateSlaveMessage =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), &containerizer, slaveFlags);
ASSERT_SOME(slave);
Clock::advance(slaveFlags.registration_backoff_factor);
Clock::settle();
AWAIT_READY(updateSlaveMessage);
// Create a scheduler.
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
// We use the filter explicitly here so that the resources will not
// be filtered for 5 seconds (the default).
Filters filters;
filters.set_refuse_seconds(0);
// Define the resources to be reserved.
Resources unreserved1 = Resources::parse("cpus:1;mem:512").get();
Resources dynamicallyReserved1 =
unreserved1.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
Resources unreserved2 = Resources::parse("cpus:0.5;mem:256").get();
Resources dynamicallyReserved2 =
unreserved2.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
// We use this to capture offers from 'resourceOffers'.
Future<vector<Offer>> offers;
// The expectation for the first offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
EXPECT_CALL(sched, registered(&driver, _, _));
driver.start();
// Advance the clock to generate an offer.
Clock::settle();
Clock::advance(masterFlags.allocation_interval);
// In the first offer, expect an offer with unreserved resources.
Clock::advance(slaveFlags.registration_backoff_factor);
Clock::advance(masterFlags.allocation_interval);
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
// The slave's total resources are twice those defined by `unreserved1`.
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved1 + unreserved1, frameworkInfo.roles(0))));
// The expectation for the next offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Reserve the first set of resources.
driver.acceptOffers({offer.id()}, {RESERVE(dynamicallyReserved1)}, filters);
Clock::settle();
Clock::advance(masterFlags.allocation_interval);
// In the next offer, expect an offer with reserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
// The reserved resources and an equal portion of
// unreserved resources should be present.
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(
dynamicallyReserved1 + unreserved1,
frameworkInfo.roles(0))));
// The expectation for the next offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Create a task to launch with the resources of `dynamicallyReserved2`.
TaskInfo taskInfo1 =
createTask(offer.slave_id(), dynamicallyReserved2, "exit 1", exec.id);
EXPECT_CALL(exec, registered(_, _, _, _));
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
// Expect a TASK_FINISHED status.
Future<TaskStatus> statusUpdateAcknowledgement;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&statusUpdateAcknowledgement));
// Attempt to unreserve a set of resources,
// reserve a second set, and launch a task.
driver.acceptOffers({offer.id()},
{UNRESERVE(dynamicallyReserved1),
RESERVE(dynamicallyReserved2),
LAUNCH({taskInfo1})},
filters);
// Wait for TASK_FINISHED update ack.
AWAIT_READY(statusUpdateAcknowledgement);
EXPECT_EQ(TASK_FINISHED, statusUpdateAcknowledgement->state());
Clock::settle();
Clock::advance(masterFlags.allocation_interval);
// In the next offer, expect to find both sets of reserved
// resources, since the Unreserve operation should fail.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(
dynamicallyReserved1 + dynamicallyReserved2 + unreserved2,
frameworkInfo.roles(0))));
// The expectation for the next offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Create a task to launch with the resources of `dynamicallyReserved1`.
TaskInfo taskInfo2 =
createTask(offer.slave_id(), dynamicallyReserved1, "exit 1", exec.id);
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
Future<TaskStatus> failedTaskStatus;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&failedTaskStatus));
// Attempt to unreserve all the dynamically reserved resources
// and launch a task on `dynamicallyReserved1`.
driver.acceptOffers({offer.id()},
{UNRESERVE(dynamicallyReserved1),
UNRESERVE(dynamicallyReserved2),
LAUNCH({taskInfo2})},
filters);
Clock::settle();
Clock::advance(masterFlags.allocation_interval);
// In the next offer, expect to find the reserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(
dynamicallyReserved1 + dynamicallyReserved2 + unreserved2,
frameworkInfo.roles(0))));
// Check that the task launched as expected.
EXPECT_EQ(TASK_FINISHED, failedTaskStatus->state());
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
}
// Confirms that reserve and unreserve operations work without authentication
// when a framework has no principal.
TEST_F(ReservationTest, WithoutAuthenticationWithoutPrincipal)
{
// Pause the clock and control it manually in order to
// control the timing of the offer cycle.
Clock::pause();
// Create a framework without a principal.
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role");
frameworkInfo.clear_principal();
// Create a master with no framework authentication.
master::Flags masterFlags = CreateMasterFlags();
masterFlags.authenticate_frameworks = false;
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.resources = "cpus:1;mem:512";
Future<UpdateSlaveMessage> updateSlaveMessage =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
Clock::advance(slaveFlags.registration_backoff_factor);
Clock::settle();
AWAIT_READY(updateSlaveMessage);
MockScheduler sched;
MesosSchedulerDriver driver(&sched, frameworkInfo, master.get()->pid);
// We use the filter explicitly here so that the resources will not
// be filtered for 5 seconds (the default).
Filters filters;
filters.set_refuse_seconds(0);
Resources unreserved = Resources::parse("cpus:1;mem:512").get();
// Create dynamically reserved resources whose `ReservationInfo` does not
// contain a principal.
Resources dynamicallyReserved = unreserved.pushReservation(
createDynamicReservationInfo(frameworkInfo.roles(0)));
// We use this to capture offers from `resourceOffers`.
Future<vector<Offer>> offers;
EXPECT_CALL(sched, registered(&driver, _, _));
// An expectation for an offer with unreserved resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
// In the first offer, expect an offer with unreserved resources.
Clock::advance(slaveFlags.registration_backoff_factor);
Clock::advance(masterFlags.allocation_interval);
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
// The expectation for the offer with reserved resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Attempt to reserve the resources.
driver.acceptOffers({offer.id()}, {RESERVE(dynamicallyReserved)}, filters);
Clock::settle();
Clock::advance(masterFlags.allocation_interval);
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
// Make sure that the reservation succeeded.
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(dynamicallyReserved, frameworkInfo.roles(0))));
// An expectation for an offer with unreserved resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Unreserve the resources.
driver.acceptOffers(
{offer.id()}, {UNRESERVE(dynamicallyReserved)}, filters);
Clock::settle();
Clock::advance(masterFlags.allocation_interval);
// In the next offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
driver.stop();
driver.join();
}
// Confirms that reserve and unreserve operations work without authentication
// when a framework has a principal.
TEST_F(ReservationTest, WithoutAuthenticationWithPrincipal)
{
// Pause the clock and control it manually in order to
// control the timing of the offer cycle.
Clock::pause();
// Create a framework with a principal.
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role");
// Create a master with no framework authentication.
master::Flags masterFlags = CreateMasterFlags();
masterFlags.authenticate_frameworks = false;
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.resources = "cpus:1;mem:512";
Future<UpdateSlaveMessage> updateSlaveMessage =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
Clock::advance(slaveFlags.registration_backoff_factor);
Clock::settle();
AWAIT_READY(updateSlaveMessage);
MockScheduler sched;
MesosSchedulerDriver driver(&sched, frameworkInfo, master.get()->pid);
// We use the filter explicitly here so that the resources will not
// be filtered for 5 seconds (the default).
Filters filters;
filters.set_refuse_seconds(0);
Resources unreserved = Resources::parse("cpus:1;mem:512").get();
// Create dynamically reserved resources whose `ReservationInfo` contains a
// principal.
Resources dynamicallyReserved =
unreserved.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
// We use this to capture offers from `resourceOffers`.
Future<vector<Offer>> offers;
EXPECT_CALL(sched, registered(&driver, _, _));
// An expectation for an offer with unreserved resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
// In the first offer, expect an offer with unreserved resources.
Clock::advance(slaveFlags.registration_backoff_factor);
Clock::advance(masterFlags.allocation_interval);
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
// The expectation for the offer with reserved resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Attempt to reserve the resources.
driver.acceptOffers({offer.id()}, {RESERVE(dynamicallyReserved)}, filters);
Clock::settle();
Clock::advance(masterFlags.allocation_interval);
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
// Make sure that the reservation succeeded.
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(dynamicallyReserved, frameworkInfo.roles(0))));
// An expectation for an offer with unreserved resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
// Unreserve the resources.
driver.acceptOffers(
{offer.id()}, {UNRESERVE(dynamicallyReserved)}, filters);
Clock::settle();
Clock::advance(masterFlags.allocation_interval);
// In the next offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
driver.stop();
driver.join();
}
// This tests that a framework can't reserve resources using a role different
// from the one it registered with.
TEST_F(ReservationTest, DropReserveWithDifferentRole)
{
const string frameworkRole = "role";
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, frameworkRole);
master::Flags masterFlags = CreateMasterFlags();
masterFlags.allocation_interval = Milliseconds(5);
TestAllocator<> allocator;
EXPECT_CALL(allocator, initialize(_, _, _));
Try<Owned<cluster::Master>> master = StartMaster(&allocator, masterFlags);
ASSERT_SOME(master);
slave::Flags agentFlags = CreateSlaveFlags();
agentFlags.resources = "cpus:1;mem:512";
Future<UpdateSlaveMessage> updateSlaveMessage =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
EXPECT_CALL(allocator, addSlave(_, _, _, _, _, _));
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), agentFlags);
ASSERT_SOME(agent);
AWAIT_READY(updateSlaveMessage);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
// We use this to capture offers from 'resourceOffers'.
Future<vector<Offer>> offers;
EXPECT_CALL(sched, registered(&driver, _, _));
// The expectation for the first offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
EXPECT_CALL(allocator, addFramework(_, _, _, _, _));
driver.start();
// In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers->front();
Resources unreserved = Resources::parse("cpus:1;mem:512").get();
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
// The expectation for the next offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
// Expect that the reserve operation will be dropped.
EXPECT_CALL(allocator, updateAllocation(_, _, _, _))
.Times(0);
// We use the filter explicitly here so that the resources will not
// be filtered for 5 seconds (the default).
Filters filters;
filters.set_refuse_seconds(0);
// Attempt to reserve resources using a different role than the one the
// framework is registered with.
Resources dynamicallyReservedDifferentRole = unreserved.pushReservation(
createDynamicReservationInfo("foo", frameworkInfo.principal()));
driver.acceptOffers(
{offer.id()},
{RESERVE(dynamicallyReservedDifferentRole)},
filters);
// In the next offer, still expect an offer with the unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers->front();
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo.roles(0))));
driver.stop();
driver.join();
}
// This test ensures that a framework can't unreserve resources
// reserved by a framework with another role.
TEST_F(ReservationTest, PreventUnreservingAlienResources)
{
const string frameworkRole1 = "role1";
FrameworkInfo frameworkInfo1 = DEFAULT_FRAMEWORK_INFO;
frameworkInfo1.set_name("framework1");
frameworkInfo1.set_roles(0, frameworkRole1);
const string frameworkRole2 = "role2";
FrameworkInfo frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
frameworkInfo2.set_name("framework2");
frameworkInfo2.set_roles(0, frameworkRole2);
master::Flags masterFlags = CreateMasterFlags();
masterFlags.allocation_interval = Milliseconds(5);
TestAllocator<> allocator;
EXPECT_CALL(allocator, initialize(_, _, _));
Try<Owned<cluster::Master>> master = StartMaster(&allocator, masterFlags);
ASSERT_SOME(master);
slave::Flags agentFlags = CreateSlaveFlags();
agentFlags.resources = "cpus:1;mem:512";
Future<UpdateSlaveMessage> updateSlaveMessage =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), agentFlags);
ASSERT_SOME(agent);
AWAIT_READY(updateSlaveMessage);
MockScheduler sched1;
MesosSchedulerDriver driver1(
&sched1, frameworkInfo1, master.get()->pid, DEFAULT_CREDENTIAL);
MockScheduler sched2;
MesosSchedulerDriver driver2(
&sched2, frameworkInfo2, master.get()->pid, DEFAULT_CREDENTIAL);
// We use this to capture offers from 'resourceOffers'.
Future<vector<Offer>> offers;
EXPECT_CALL(sched1, registered(&driver1, _, _));
// The expectation for the first offer.
EXPECT_CALL(sched1, resourceOffers(&driver1, _))
.WillOnce(FutureArg<1>(&offers));
driver1.start();
// In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers->front();
const Resources unreserved = Resources::parse("cpus:1;mem:512").get();
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved, frameworkInfo1.roles(0))));
// The expectation for the next offer.
EXPECT_CALL(sched1, resourceOffers(&driver1, _))
.WillOnce(FutureArg<1>(&offers));
// We use the filter explicitly here so that the resources will not
// be filtered for 5 seconds (the default).
Filters filters;
filters.set_refuse_seconds(0);
// Reserve half the memory for `frameworkRole1`.
const Resources halfMemory = Resources::parse("mem:256").get();
const Resources dynamicallyReserved = halfMemory.pushReservation(
createDynamicReservationInfo(frameworkRole1, frameworkInfo1.principal()));
driver1.acceptOffers({offer.id()}, {RESERVE(dynamicallyReserved)}, filters);
// In the next offer, expect an offer with reserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers->front();
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(dynamicallyReserved, frameworkInfo1.roles(0))));
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(halfMemory, frameworkInfo1.roles(0))));
// The filter to decline the offer "forever".
Filters filtersForever;
filtersForever.set_refuse_seconds(std::numeric_limits<double>::max());
// Decline the offer "forever" in order to force `framework2` to
// receive the remaining resources.
driver1.declineOffer(offer.id(), filtersForever);
EXPECT_CALL(sched2, registered(&driver2, _, _));
// The expectation for `driver2`'s first offer.
EXPECT_CALL(sched2, resourceOffers(&driver2, _))
.WillOnce(FutureArg<1>(&offers));
driver2.start();
// Expect an offer without the resources reserved by `framework1`.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers->front();
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(halfMemory, frameworkInfo2.roles(0))));
EXPECT_FALSE(Resources(offer.resources()).contains(
allocatedResources(dynamicallyReserved, frameworkInfo2.roles(0))));
// The expectation for the next offer.
EXPECT_CALL(sched2, resourceOffers(&driver2, _))
.WillOnce(FutureArg<1>(&offers));
// Expect that the unreserve operation will be dropped and hence
// allocator not called at all.
EXPECT_CALL(allocator, updateAllocation(_, _, _, _))
.Times(0);
// Try to make `framework2` "steal" the resources reserved by `framework1`.
driver2.acceptOffers({offer.id()}, {UNRESERVE(dynamicallyReserved)}, filters);
// Expect another offer without the resources reserved by `framework1`.
AWAIT_READY(offers);
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(halfMemory, frameworkInfo2.roles(0))));
EXPECT_FALSE(Resources(offer.resources()).contains(
allocatedResources(dynamicallyReserved, frameworkInfo2.roles(0))));
// Decline the offer "forever" in order to force `framework1` to
// receive the remaining resources.
driver2.declineOffer(offer.id(), filtersForever);
driver2.stop();
driver2.join();
// The expectation for the last offer.
EXPECT_CALL(sched1, resourceOffers(&driver1, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
// Make the allocator ignore the filters.
driver1.reviveOffers();
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers->front();
// Make sure that the reservation is still in place.
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(halfMemory, frameworkInfo1.roles(0))));
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(dynamicallyReserved, frameworkInfo1.roles(0))));
driver1.stop();
driver1.join();
}
class ReservationCheckpointingTest : public MesosTest {};
// This test verifies that CheckpointResourcesMessages are sent to the
// slave when a framework reserve/unreserves resources, and the
// resources in the messages correctly reflect the resources that need
// to be checkpointed on the slave.
TEST_F(ReservationCheckpointingTest, SendingCheckpointResourcesMessage)
{
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role");
master::Flags masterFlags = CreateMasterFlags();
masterFlags.allocation_interval = Milliseconds(5);
masterFlags.roles = frameworkInfo.roles(0);
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.resources = "cpus:8;mem:4096";
// The master only sends `CheckpointResourcesMessage` to
// agents which are not resource provider-capable.
slaveFlags.agent_features = SlaveCapabilities();
foreach (
const SlaveInfo::Capability& slaveCapability,
slave::AGENT_CAPABILITIES()) {
if (slaveCapability.type() != SlaveInfo::Capability::RESOURCE_PROVIDER) {
slaveFlags.agent_features->add_capabilities()->CopyFrom(slaveCapability);
}
}
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Resources unreserved1 = Resources::parse("cpus:8").get();
Resources reserved1 =
unreserved1.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
Resources unreserved2 = Resources::parse("mem:2048").get();
Resources reserved2 =
unreserved2.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
// We use this to capture offers from 'resourceOffers'.
Future<vector<Offer>> offers;
EXPECT_CALL(sched, registered(&driver, _, _));
// The expectation for the first offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
// In the first offer, expect the sum of 'unreserved1' and
// 'unreserved2'.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(unreserved1 + unreserved2, frameworkInfo.roles(0))));
Future<CheckpointResourcesMessage> message3 =
FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
Future<CheckpointResourcesMessage> message2 =
FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
Future<CheckpointResourcesMessage> message1 =
FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
// We use the filter explicitly here so that the resources will not
// be filtered for 5 seconds (the default).
Filters filters;
filters.set_refuse_seconds(0);
// Attempt to reserve and unreserve resources.
driver.acceptOffers(
{offer.id()},
{RESERVE(reserved1), RESERVE(reserved2), UNRESERVE(reserved1)},
filters);
// NOTE: Currently, we send one message per operation. But this is
// an implementation detail which is subject to change.
// Expect the 'RESERVE(reserved1)' as the first message.
// The checkpointed resources should correspond to 'reserved1'.
AWAIT_READY(message1);
EXPECT_EQ(Resources(message1->resources()), reserved1);
// Expect the 'RESERVE(reserved2)' as the second message.
// The checkpointed resources should correspond to
// 'reserved1 + reserved2'.
AWAIT_READY(message2);
EXPECT_EQ(Resources(message2->resources()), reserved1 + reserved2);
// Expect the 'UNRESERVE(reserved1)' as the third message.
// The checkpointed resources should correspond to 'reserved2'.
AWAIT_READY(message3);
EXPECT_EQ(Resources(message3->resources()), reserved2);
driver.stop();
driver.join();
}
// This test verifies that the agent still checkpoints and recovers resources
// in the format used by agents that don't atomically checkpoint operations and
// resources.
//
// To verify this the test does the following:
//
// 1. Start a master, agent, and a framework.
// 2. Make the framework reserve resources and assert that they are offered
// back.
// 3. Kill the agent.
// 4. Remove the file that contains the operations and resources.
// 5. Start a new agent using the same metadata directory.
// 6. Start a new framework and assert that it is offered the resources that
// were reserved in step #2.
TEST_F(ReservationTest, ReservationCheckpointedBackwardsCompatibility)
{
// Pause the cock and control it manually in order to
// control the timing of the offer cycle.
Clock::pause();
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_roles(0, "role");
master::Flags masterFlags = CreateMasterFlags();
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.resources = "cpus:8;mem:2048";
Future<UpdateSlaveMessage> updateSlaveMessage =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
Clock::advance(slaveFlags.registration_backoff_factor);
Clock::settle();
AWAIT_READY(updateSlaveMessage);
MockScheduler sched1;
MesosSchedulerDriver driver1(
&sched1, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
Resources unreserved = Resources::parse("cpus:8;mem:2048").get();
Resources reserved = unreserved.pushReservation(createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
// We use this to capture offers from 'resourceOffers'.
Future<vector<Offer>> offers;
EXPECT_CALL(sched1, registered(&driver1, _, _));
// Set an expectation for the first offer.
EXPECT_CALL(sched1, resourceOffers(&driver1, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver1.start();
// Advance the clock to generate an offer.
Clock::advance(masterFlags.allocation_interval);
Clock::settle();
// In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Offer offer = offers.get()[0];
Future<Resources> message = getOperationMessage(slave.get()->pid);
// We use the filter explicitly here so that the resources
// will not be filtered for 5 seconds (the default).
Filters filters;
filters.set_refuse_seconds(0);
// Reserve the resources.
driver1.acceptOffers({offer.id()}, {RESERVE(reserved)}, filters);
// Expect the agent to receive the operation message.
AWAIT_READY(message);
// This is to make sure the agent processes the operation message.
Clock::settle();
// Set an expectation for the next offer.
EXPECT_CALL(sched1, resourceOffers(&driver1, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
// Advance the clock to generate an offer.
Clock::advance(masterFlags.allocation_interval);
Clock::settle();
// In the next offer, expect an offer with the reserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(reserved, frameworkInfo.roles(0))));
// Stop the framework.
driver1.stop();
driver1.join();
Future<SlaveReregisteredMessage> slaveReregistered = FUTURE_PROTOBUF(
SlaveReregisteredMessage(), master.get()->pid, _);
updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
// Simulate agent failover.
slave.get()->terminate();
slave->reset();
string resourceStatePath = slave::paths::getResourceStatePath(
slave::paths::getMetaRootDir(slaveFlags.work_dir));
CHECK_SOME(os::rm(resourceStatePath));
slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
// Advance the clock, so that the agent re-registers.
Clock::advance(slaveFlags.registration_backoff_factor);
// Resume the clock to avoid deadlocks related to agent registration.
// See MESOS-8828.
Clock::resume();
// Wait for the agent to re-register.
AWAIT_READY(slaveReregistered);
AWAIT_READY(updateSlaveMessage);
Clock::pause();
MockScheduler sched2;
MesosSchedulerDriver driver2(
&sched2, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched2, registered(&driver2, _, _));
// Set an expectation for the next new framework's first offer.
EXPECT_CALL(sched2, resourceOffers(&driver2, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
driver2.start();
// Advance the clock to generate an offer.
Clock::advance(masterFlags.allocation_interval);
Clock::settle();
// Expect an offer with the reserved resources.
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(
allocatedResources(reserved, frameworkInfo.roles(0))));
driver2.stop();
driver2.join();
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {