blob: 639435729e70c64a2aee45c8b22850e1925aa260 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <algorithm>
#include <iostream>
#include <map>
#include <set>
#include <string>
#include <vector>
#include <google/protobuf/util/message_differencer.h>
#include <mesos/attributes.hpp>
#include <mesos/type_utils.hpp>
#include <mesos/authentication/http/basic_authenticator_factory.hpp>
#include <mesos/log/log.hpp>
#include <mesos/state/log.hpp>
#include <mesos/state/state.hpp>
#include <mesos/state/storage.hpp>
#include <process/clock.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
#include <stout/bytes.hpp>
#include <stout/stopwatch.hpp>
#include <stout/uuid.hpp>
#include <stout/tests/utils.hpp>
#include "common/protobuf_utils.hpp"
#include "log/replica.hpp"
#include "log/tool/initialize.hpp"
#include "master/flags.hpp"
#include "master/maintenance.hpp"
#include "master/master.hpp"
#include "master/quota.hpp"
#include "master/registrar.hpp"
#include "master/registry_operations.hpp"
#include "master/weights.hpp"
#include "tests/mesos.hpp"
using namespace mesos::internal::master;
using namespace process;
using mesos::log::Log;
using mesos::internal::log::Replica;
using std::cout;
using std::endl;
using std::map;
using std::pair;
using std::set;
using std::string;
using std::vector;
using process::Clock;
using process::Owned;
using process::http::OK;
using process::http::Response;
using process::http::Unauthorized;
using google::protobuf::Map;
using google::protobuf::RepeatedPtrField;
using google::protobuf::util::MessageDifferencer;
using mesos::internal::protobuf::maintenance::createMachineList;
using mesos::internal::protobuf::maintenance::createSchedule;
using mesos::internal::protobuf::maintenance::createUnavailability;
using mesos::internal::protobuf::maintenance::createWindow;
using testing::_;
using testing::DoAll;
using testing::Eq;
using testing::Return;
using ::testing::WithParamInterface;
namespace mesos {
namespace internal {
namespace tests {
namespace quota = mesos::internal::master::quota;
namespace authentication = process::http::authentication;
using namespace mesos::maintenance;
using namespace mesos::quota;
using namespace mesos::internal::master::maintenance;
using namespace mesos::internal::master::quota;
using namespace mesos::internal::master::weights;
using mesos::http::authentication::BasicAuthenticatorFactory;
using mesos::state::LogStorage;
using mesos::state::State;
using mesos::state::Storage;
using state::Entry;
static vector<WeightInfo> getWeightInfos(
const hashmap<string, double>& weights) {
vector<WeightInfo> weightInfos;
foreachpair (const string& role, double weight, weights) {
WeightInfo weightInfo;
weightInfo.set_role(role);
weightInfo.set_weight(weight);
weightInfos.push_back(weightInfo);
}
return weightInfos;
}
// TODO(xujyan): This class copies code from LogStateTest. It would
// be nice to find a common location for log related base tests when
// there are more uses of it.
class RegistrarTestBase : public TemporaryDirectoryTest
{
public:
RegistrarTestBase()
: log(nullptr),
storage(nullptr),
state(nullptr),
replica2(nullptr) {}
protected:
void SetUp() override
{
TemporaryDirectoryTest::SetUp();
// For initializing the replicas.
log::tool::Initialize initializer;
string path1 = os::getcwd() + "/.log1";
string path2 = os::getcwd() + "/.log2";
initializer.flags.path = path1;
initializer.execute();
initializer.flags.path = path2;
initializer.execute();
// Only create the replica for 'path2' (i.e., the second replica)
// as the first replica will be created when we create a Log.
replica2 = new Replica(path2);
set<UPID> pids;
pids.insert(replica2->pid());
log = new Log(2, path1, pids);
storage = new LogStorage(log);
state = new State(storage);
// Compensate for slow CI machines / VMs.
flags.registry_store_timeout = process::TEST_AWAIT_TIMEOUT;
master.CopyFrom(protobuf::createMasterInfo(UPID("master@127.0.0.1:5050")));
SlaveID id;
id.set_value("1");
SlaveInfo info;
info.set_hostname("localhost");
info.mutable_id()->CopyFrom(id);
slave.CopyFrom(info);
}
void TearDown() override
{
delete state;
delete storage;
delete log;
delete replica2;
TemporaryDirectoryTest::TearDown();
}
Log* log;
Storage* storage;
State* state;
Replica* replica2;
MasterInfo master;
SlaveInfo slave;
Flags flags;
};
class RegistrarTest : public RegistrarTestBase {};
TEST_F(RegistrarTest, Recover)
{
Registrar registrar(flags, state);
// Operations preceding recovery will fail.
AWAIT_EXPECT_FAILED(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(slave))));
AWAIT_EXPECT_FAILED(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveUnreachable(slave, protobuf::getCurrentTime()))));
AWAIT_EXPECT_FAILED(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveReachable(slave))));
AWAIT_EXPECT_FAILED(registrar.apply(Owned<RegistryOperation>(
new RemoveSlave(slave))));
Future<Registry> registry = registrar.recover(master);
// Before waiting for the recovery to complete, invoke some
// operations to ensure they do not fail.
Future<bool> admit = registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(slave)));
Future<bool> unreachable = registrar.apply(Owned<RegistryOperation>(
new MarkSlaveUnreachable(slave, protobuf::getCurrentTime())));
Future<bool> reachable = registrar.apply(Owned<RegistryOperation>(
new MarkSlaveReachable(slave)));
Future<bool> remove = registrar.apply(Owned<RegistryOperation>(
new RemoveSlave(slave)));
AWAIT_READY(registry);
EXPECT_EQ(master, registry->master().info());
AWAIT_TRUE(admit);
AWAIT_TRUE(unreachable);
AWAIT_TRUE(reachable);
AWAIT_TRUE(remove);
}
TEST_F(RegistrarTest, Admit)
{
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(new AdmitSlave(slave))));
AWAIT_FALSE(registrar.apply(Owned<RegistryOperation>(new AdmitSlave(slave))));
}
TEST_F(RegistrarTest, UpdateSlave)
{
// Add a new slave to the registry.
{
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
slave.set_hostname("original");
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(slave))));
}
// Verify that the slave is present, and update its hostname.
{
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
ASSERT_EQ(1, registry->slaves().slaves().size());
EXPECT_EQ("original", registry->slaves().slaves(0).info().hostname());
slave.set_hostname("changed");
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new UpdateSlave(slave))));
}
// Verify that the hostname indeed changed, and do one additional update
// to check that the operation is idempotent.
{
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
ASSERT_EQ(1, registry->slaves().slaves().size());
EXPECT_EQ("changed", registry->slaves().slaves(0).info().hostname());
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new UpdateSlave(slave))));
}
// Verify that nothing changed from the second update.
{
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
ASSERT_EQ(1, registry->slaves().slaves().size());
EXPECT_EQ("changed", registry->slaves().slaves(0).info().hostname());
}
}
TEST_F(RegistrarTest, MarkReachable)
{
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
SlaveID id1;
id1.set_value("1");
SlaveInfo info1;
info1.set_hostname("localhost");
info1.mutable_id()->CopyFrom(id1);
SlaveID id2;
id2.set_value("2");
SlaveInfo info2;
info2.set_hostname("localhost");
info2.mutable_id()->CopyFrom(id2);
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(info1))));
// Attempting to mark a slave as reachable that is already reachable
// should not result in an error.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveReachable(info1))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveReachable(info1))));
// Attempting to mark a slave as reachable that is not in the
// unreachable list should not result in error.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveReachable(info2))));
}
TEST_F(RegistrarTest, MarkUnreachable)
{
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
SlaveID id1;
id1.set_value("1");
SlaveInfo info1;
info1.set_hostname("localhost");
info1.mutable_id()->CopyFrom(id1);
SlaveID id2;
id2.set_value("2");
SlaveInfo info2;
info2.set_hostname("localhost");
info2.mutable_id()->CopyFrom(id2);
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(info1))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveUnreachable(info1, protobuf::getCurrentTime()))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveReachable(info1))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveUnreachable(info1, protobuf::getCurrentTime()))));
// If a slave is already unreachable, trying to mark it unreachable
// again should fail.
AWAIT_FALSE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveUnreachable(info1, protobuf::getCurrentTime()))));
}
// Verify that an admitted slave can be marked as gone.
TEST_F(RegistrarTest, MarkGone)
{
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
SlaveID id1;
id1.set_value("1");
SlaveInfo info1;
info1.set_hostname("localhost");
info1.mutable_id()->CopyFrom(id1);
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(info1))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveGone(info1.id(), protobuf::getCurrentTime()))));
}
// Verify that an unreachable slave can be marked as gone.
TEST_F(RegistrarTest, MarkUnreachableGone)
{
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
SlaveID id1;
id1.set_value("1");
SlaveInfo info1;
info1.set_hostname("localhost");
info1.mutable_id()->CopyFrom(id1);
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(info1))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveUnreachable(info1, protobuf::getCurrentTime()))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveGone(info1.id(), protobuf::getCurrentTime()))));
// If a slave is already gone, trying to mark it gone again should fail.
AWAIT_FALSE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveGone(info1.id(), protobuf::getCurrentTime()))));
// If a slave is already gone, trying to mark it unreachable
// again should fail.
AWAIT_FALSE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveUnreachable(info1, protobuf::getCurrentTime()))));
}
TEST_F(RegistrarTest, Prune)
{
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
SlaveID id1;
id1.set_value("1");
SlaveInfo info1;
info1.set_hostname("localhost");
info1.mutable_id()->CopyFrom(id1);
SlaveID id2;
id2.set_value("2");
SlaveInfo info2;
info2.set_hostname("localhost");
info2.mutable_id()->CopyFrom(id2);
SlaveID id3;
id3.set_value("3");
SlaveInfo info3;
info3.set_hostname("localhost");
info3.mutable_id()->CopyFrom(id3);
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(info1))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(info2))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(info3))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveUnreachable(info1, protobuf::getCurrentTime()))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveUnreachable(info2, protobuf::getCurrentTime()))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveGone(info3.id(), protobuf::getCurrentTime()))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new Prune({id1}, {}))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new Prune({id2}, {}))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new Prune({}, {id3}))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(info1))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(info2))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(info3))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveUnreachable(info1, protobuf::getCurrentTime()))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveUnreachable(info2, protobuf::getCurrentTime()))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveGone(info3.id(), protobuf::getCurrentTime()))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new Prune({id1}, {}))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new Prune({id2}, {}))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new Prune({}, {id3}))));
}
TEST_F(RegistrarTest, Remove)
{
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
SlaveID id1;
id1.set_value("1");
SlaveInfo info1;
info1.set_hostname("localhost");
info1.mutable_id()->CopyFrom(id1);
SlaveID id2;
id2.set_value("2");
SlaveInfo info2;
info2.set_hostname("localhost");
info2.mutable_id()->CopyFrom(id2);
SlaveID id3;
id3.set_value("3");
SlaveInfo info3;
info3.set_hostname("localhost");
info3.mutable_id()->CopyFrom(id3);
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(info1))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(info2))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(info3))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new RemoveSlave(info1))));
AWAIT_FALSE(registrar.apply(Owned<RegistryOperation>(
new RemoveSlave(info1))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(info1))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new RemoveSlave(info2))));
AWAIT_FALSE(registrar.apply(Owned<RegistryOperation>(
new RemoveSlave(info2))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new RemoveSlave(info3))));
AWAIT_FALSE(registrar.apply(Owned<RegistryOperation>(
new RemoveSlave(info3))));
}
// NOTE: For the following tests, the state of the registrar can
// only be viewed once per instantiation of the registrar.
// To check the result of each operation, we must re-construct
// the registrar, which is done by putting the code into scoped blocks.
// TODO(josephw): Consider refactoring these maintenance operation tests
// to use a helper function for each un-named scoped block.
// For example:
// MaintenanceTest(flags, state, [=](const Registry& registry) {
// // Checks and operations. i.e.:
// EXPECT_EQ(1, registry->schedules().size());
// });
// Adds maintenance schedules to the registry, one machine at a time.
// Then removes machines from the schedule.
TEST_F(RegistrarTest, UpdateMaintenanceSchedule)
{
// Machine definitions used in this test.
MachineID machine1;
machine1.set_ip("0.0.0.1");
MachineID machine2;
machine2.set_hostname("2");
MachineID machine3;
machine3.set_hostname("3");
machine3.set_ip("0.0.0.3");
Unavailability unavailability = createUnavailability(Clock::now());
{
// Prepare the registrar.
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
// Schedule one machine for maintenance.
maintenance::Schedule schedule = createSchedule(
{createWindow({machine1}, unavailability)});
AWAIT_READY(registrar.apply(Owned<RegistryOperation>(
new UpdateSchedule(schedule))));
}
{
// Check that one schedule and one machine info was made.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
EXPECT_EQ(1, registry->schedules().size());
EXPECT_EQ(1, registry->schedules(0).windows().size());
EXPECT_EQ(1, registry->schedules(0).windows(0).machine_ids().size());
EXPECT_EQ(1, registry->machines().machines().size());
EXPECT_EQ(
MachineInfo::DRAINING,
registry->machines().machines(0).info().mode());
// Extend the schedule by one machine (in a different window).
maintenance::Schedule schedule = createSchedule({
createWindow({machine1}, unavailability),
createWindow({machine2}, unavailability)});
AWAIT_READY(registrar.apply(Owned<RegistryOperation>(
new UpdateSchedule(schedule))));
}
{
// Check that both machines are part of maintenance.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
EXPECT_EQ(1, registry->schedules().size());
EXPECT_EQ(2, registry->schedules(0).windows().size());
EXPECT_EQ(1, registry->schedules(0).windows(0).machine_ids().size());
EXPECT_EQ(1, registry->schedules(0).windows(1).machine_ids().size());
EXPECT_EQ(2, registry->machines().machines().size());
EXPECT_EQ(
MachineInfo::DRAINING,
registry->machines().machines(1).info().mode());
// Extend a window by one machine.
maintenance::Schedule schedule = createSchedule({
createWindow({machine1}, unavailability),
createWindow({machine2, machine3}, unavailability)});
AWAIT_READY(registrar.apply(Owned<RegistryOperation>(
new UpdateSchedule(schedule))));
}
{
// Check that all three machines are included.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
EXPECT_EQ(1, registry->schedules().size());
EXPECT_EQ(2, registry->schedules(0).windows().size());
EXPECT_EQ(1, registry->schedules(0).windows(0).machine_ids().size());
EXPECT_EQ(2, registry->schedules(0).windows(1).machine_ids().size());
EXPECT_EQ(3, registry->machines().machines().size());
EXPECT_EQ(
MachineInfo::DRAINING,
registry->machines().machines(2).info().mode());
// Rearrange the schedule into one window.
maintenance::Schedule schedule = createSchedule(
{createWindow({machine1, machine2, machine3}, unavailability)});
AWAIT_READY(registrar.apply(Owned<RegistryOperation>(
new UpdateSchedule(schedule))));
}
{
// Check that the machine infos are unchanged, but the schedule is.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
EXPECT_EQ(1, registry->schedules().size());
EXPECT_EQ(1, registry->schedules(0).windows().size());
EXPECT_EQ(3, registry->schedules(0).windows(0).machine_ids().size());
EXPECT_EQ(3, registry->machines().machines().size());
EXPECT_EQ(
MachineInfo::DRAINING,
registry->machines().machines(0).info().mode());
EXPECT_EQ(
MachineInfo::DRAINING,
registry->machines().machines(1).info().mode());
EXPECT_EQ(
MachineInfo::DRAINING,
registry->machines().machines(2).info().mode());
// Delete one machine from the schedule.
maintenance::Schedule schedule = createSchedule(
{createWindow({machine2, machine3}, unavailability)});
AWAIT_READY(registrar.apply(Owned<RegistryOperation>(
new UpdateSchedule(schedule))));
}
{
// Check that one machine info is removed.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
EXPECT_EQ(1, registry->schedules().size());
EXPECT_EQ(1, registry->schedules(0).windows().size());
EXPECT_EQ(2, registry->schedules(0).windows(0).machine_ids().size());
EXPECT_EQ(2, registry->machines().machines().size());
// Delete all machines from the schedule.
maintenance::Schedule schedule;
AWAIT_READY(registrar.apply(Owned<RegistryOperation>(
new UpdateSchedule(schedule))));
}
{
// Check that all statuses are removed.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
EXPECT_EQ(1, registry->schedules().size());
EXPECT_TRUE(registry->schedules(0).windows().empty());
EXPECT_TRUE(registry->machines().machines().empty());
}
}
// Creates a schedule and properly starts maintenance.
TEST_F(RegistrarTest, StartMaintenance)
{
// Machine definitions used in this test.
MachineID machine1;
machine1.set_ip("0.0.0.1");
MachineID machine2;
machine2.set_hostname("2");
MachineID machine3;
machine3.set_hostname("3");
machine3.set_ip("0.0.0.3");
Unavailability unavailability = createUnavailability(Clock::now());
{
// Prepare the registrar.
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
// Schedule two machines for maintenance.
maintenance::Schedule schedule = createSchedule(
{createWindow({machine1, machine2}, unavailability)});
AWAIT_READY(registrar.apply(Owned<RegistryOperation>(
new UpdateSchedule(schedule))));
// Transition machine two into `DOWN` mode.
RepeatedPtrField<MachineID> machines = createMachineList({machine2});
AWAIT_READY(registrar.apply(Owned<RegistryOperation>(
new StartMaintenance(machines))));
}
{
// Check that machine two is down.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
EXPECT_EQ(2, registry->machines().machines().size());
EXPECT_EQ(
MachineInfo::DRAINING,
registry->machines().machines(0).info().mode());
EXPECT_EQ(
MachineInfo::DOWN,
registry->machines().machines(1).info().mode());
// Schedule three machines for maintenance.
maintenance::Schedule schedule = createSchedule(
{createWindow({machine1, machine2, machine3}, unavailability)});
AWAIT_READY(registrar.apply(Owned<RegistryOperation>(
new UpdateSchedule(schedule))));
// Deactivate the two `DRAINING` machines.
RepeatedPtrField<MachineID> machines =
createMachineList({machine1, machine3});
AWAIT_READY(registrar.apply(Owned<RegistryOperation>(
new StartMaintenance(machines))));
}
{
// Check that all machines are down.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
EXPECT_EQ(3, registry->machines().machines().size());
EXPECT_EQ(
MachineInfo::DOWN,
registry->machines().machines(0).info().mode());
EXPECT_EQ(
MachineInfo::DOWN,
registry->machines().machines(1).info().mode());
EXPECT_EQ(
MachineInfo::DOWN,
registry->machines().machines(2).info().mode());
}
}
// Creates a schedule and properly starts and stops maintenance.
TEST_F(RegistrarTest, StopMaintenance)
{
// Machine definitions used in this test.
MachineID machine1;
machine1.set_ip("0.0.0.1");
MachineID machine2;
machine2.set_hostname("2");
MachineID machine3;
machine3.set_hostname("3");
machine3.set_ip("0.0.0.3");
Unavailability unavailability = createUnavailability(Clock::now());
{
// Prepare the registrar.
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
// Schdule three machines for maintenance.
maintenance::Schedule schedule = createSchedule({
createWindow({machine1, machine2}, unavailability),
createWindow({machine3}, unavailability)});
AWAIT_READY(registrar.apply(Owned<RegistryOperation>(
new UpdateSchedule(schedule))));
// Transition machine three into `DOWN` mode.
RepeatedPtrField<MachineID> machines = createMachineList({machine3});
AWAIT_READY(registrar.apply(Owned<RegistryOperation>(
new StartMaintenance(machines))));
// Transition machine three into `UP` mode.
AWAIT_READY(registrar.apply(Owned<RegistryOperation>(
new StopMaintenance(machines))));
}
{
// Check that machine three and the window were removed.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
EXPECT_EQ(1, registry->schedules().size());
EXPECT_EQ(1, registry->schedules(0).windows().size());
EXPECT_EQ(2, registry->schedules(0).windows(0).machine_ids().size());
EXPECT_EQ(2, registry->machines().machines().size());
EXPECT_EQ(
MachineInfo::DRAINING,
registry->machines().machines(0).info().mode());
EXPECT_EQ(
MachineInfo::DRAINING,
registry->machines().machines(1).info().mode());
// Transition machine one and two into `DOWN` mode.
RepeatedPtrField<MachineID> machines =
createMachineList({machine1, machine2});
AWAIT_READY(registrar.apply(Owned<RegistryOperation>(
new StartMaintenance(machines))));
// Transition all machines into `UP` mode.
AWAIT_READY(registrar.apply(Owned<RegistryOperation>(
new StopMaintenance(machines))));
}
{
// Check that the schedule is now empty.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
EXPECT_TRUE(registry->schedules().empty());
EXPECT_TRUE(registry->machines().machines().empty());
}
}
// Marks an agent for draining and checks for the appropriate data.
TEST_F(RegistrarTest, DrainAgent)
{
SlaveID notAdmittedID;
notAdmittedID.set_value("not-admitted");
{
// Prepare the registrar.
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
// Add an agent to be marked by other operations.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(slave))));
// Try to mark an unknown agent for draining.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new DrainAgent(notAdmittedID, None(), false))));
}
{
// Check that the agent is admitted, but has no DrainConfig.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
ASSERT_EQ(1, registry->slaves().slaves().size());
EXPECT_EQ(slave, registry->slaves().slaves(0).info());
EXPECT_FALSE(registry->slaves().slaves(0).has_drain_info());
EXPECT_FALSE(registry->slaves().slaves(0).deactivated());
// No minimum capability should be added when the operation does
// not mutate anything.
EXPECT_EQ(0, registry->minimum_capabilities().size());
// Drain an admitted agent.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new DrainAgent(slave.id(), None(), true))));
}
{
// Check that agent is now marked for draining.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
ASSERT_EQ(1, registry->slaves().slaves().size());
ASSERT_TRUE(registry->slaves().slaves(0).has_drain_info());
EXPECT_EQ(DRAINING, registry->slaves().slaves(0).drain_info().state());
EXPECT_FALSE(registry->slaves().slaves(0)
.drain_info().config().has_max_grace_period());
EXPECT_TRUE(registry->slaves().slaves(0).drain_info().config().mark_gone());
EXPECT_TRUE(registry->slaves().slaves(0).deactivated());
// Minimum capability should be added now.
ASSERT_EQ(1, registry->minimum_capabilities().size());
EXPECT_EQ(
MasterInfo_Capability_Type_Name(MasterInfo::Capability::AGENT_DRAINING),
registry->minimum_capabilities(0).capability());
// Mark the agent unreachable.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveUnreachable(slave, protobuf::getCurrentTime()))));
}
{
// Check that unreachable agent retains the draining.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
EXPECT_EQ(0, registry->slaves().slaves().size());
ASSERT_EQ(1, registry->unreachable().slaves().size());
ASSERT_TRUE(registry->unreachable().slaves(0).has_drain_info());
EXPECT_FALSE(
registry->unreachable().slaves(0)
.drain_info().config().has_max_grace_period());
EXPECT_TRUE(
registry->unreachable().slaves(0).drain_info().config().mark_gone());
EXPECT_TRUE(registry->unreachable().slaves(0).deactivated());
// Mark the agent reachable.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveReachable(slave))));
}
{
// Check that reachable agent retains the draining.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
ASSERT_EQ(1, registry->slaves().slaves().size());
EXPECT_EQ(0, registry->unreachable().slaves().size());
ASSERT_TRUE(registry->slaves().slaves(0).has_drain_info());
EXPECT_FALSE(
registry->slaves().slaves(0)
.drain_info().config().has_max_grace_period());
EXPECT_TRUE(registry->slaves().slaves(0).drain_info().config().mark_gone());
EXPECT_TRUE(registry->slaves().slaves(0).deactivated());
}
}
TEST_F(RegistrarTest, MarkAgentDrained)
{
{
// Prepare the registrar.
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
// Add an agent to be marked for draining.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(slave))));
// Try to mark a non-draining agent as drained. This should fail.
AWAIT_FALSE(registrar.apply(Owned<RegistryOperation>(
new MarkAgentDrained(slave.id()))));
// Drain the admitted agent.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new DrainAgent(slave.id(), None(), true))));
}
{
// Check that agent is now marked for draining.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
ASSERT_EQ(1, registry->slaves().slaves().size());
EXPECT_TRUE(registry->slaves().slaves(0).has_drain_info());
EXPECT_EQ(DRAINING, registry->slaves().slaves(0).drain_info().state());
// Transition from draining to drained.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkAgentDrained(slave.id()))));
}
{
// Check that agent is now marked drained.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
ASSERT_EQ(1, registry->slaves().slaves().size());
ASSERT_TRUE(registry->slaves().slaves(0).has_drain_info());
EXPECT_EQ(DRAINED, registry->slaves().slaves(0).drain_info().state());
// Try the same sequence of operations for an unreachable agent.
// First remove the agent.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new RemoveSlave(slave))));
// Add the agent back, anew.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(slave))));
// Mark it unreachable.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveUnreachable(slave, protobuf::getCurrentTime()))));
// Try to mark the agent drained prematurely.
AWAIT_FALSE(registrar.apply(Owned<RegistryOperation>(
new MarkAgentDrained(slave.id()))));
// Now drain properly.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new DrainAgent(slave.id(), None(), true))));
// And finish draining.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkAgentDrained(slave.id()))));
}
{
// Check that unreachable agent is now marked drained.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
ASSERT_EQ(1, registry->unreachable().slaves().size());
ASSERT_TRUE(registry->unreachable().slaves(0).has_drain_info());
EXPECT_EQ(DRAINED, registry->unreachable().slaves(0).drain_info().state());
}
}
TEST_F(RegistrarTest, DeactivateAgent)
{
SlaveID notAdmittedID;
notAdmittedID.set_value("not-admitted");
{
// Prepare the registrar.
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
// Add an agent to be marked by other operations.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(slave))));
// Try to mark an unknown agent for draining.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new DeactivateAgent(notAdmittedID))));
}
{
// Check that the agent is admitted and is not deactivated.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
ASSERT_EQ(1, registry->slaves().slaves().size());
EXPECT_EQ(slave, registry->slaves().slaves(0).info());
EXPECT_FALSE(registry->slaves().slaves(0).has_drain_info());
EXPECT_FALSE(registry->slaves().slaves(0).deactivated());
// No minimum capability should be added when the operation does
// not mutate anything.
EXPECT_EQ(0, registry->minimum_capabilities().size());
// Deactivate the admitted agent this time.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new DeactivateAgent(slave.id()))));
}
{
// Check that agent is now deactivated.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
ASSERT_EQ(1, registry->slaves().slaves().size());
EXPECT_FALSE(registry->slaves().slaves(0).has_drain_info());
EXPECT_TRUE(registry->slaves().slaves(0).deactivated());
// Minimum capability should be added now.
ASSERT_EQ(1, registry->minimum_capabilities().size());
EXPECT_EQ(
MasterInfo_Capability_Type_Name(MasterInfo::Capability::AGENT_DRAINING),
registry->minimum_capabilities(0).capability());
// Mark the agent unreachable.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveUnreachable(slave, protobuf::getCurrentTime()))));
}
{
// Check that unreachable agent retains the deactivation.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
EXPECT_EQ(0, registry->slaves().slaves().size());
ASSERT_EQ(1, registry->unreachable().slaves().size());
EXPECT_FALSE(registry->unreachable().slaves(0).has_drain_info());
EXPECT_TRUE(registry->unreachable().slaves(0).deactivated());
// Mark the agent reachable.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveReachable(slave))));
}
{
// Check that reachable agent retains the deactivation.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
ASSERT_EQ(1, registry->slaves().slaves().size());
EXPECT_EQ(0, registry->unreachable().slaves().size());
EXPECT_FALSE(registry->slaves().slaves(0).has_drain_info());
EXPECT_TRUE(registry->slaves().slaves(0).deactivated());
}
}
// Checks that reactivating agents will remove the draining/deactivated
// metadata and the AGENT_DRAINING minimum capability correctly.
TEST_F(RegistrarTest, ReactivateAgent)
{
SlaveID reachable1;
reachable1.set_value("reachable1");
SlaveID reachable2;
reachable2.set_value("reachable2");
SlaveID unreachable1;
unreachable1.set_value("unreachable1");
SlaveID unreachable2;
unreachable2.set_value("unreachable2");
SlaveInfo info1;
info1.set_hostname("localhost");
info1.mutable_id()->CopyFrom(reachable1);
SlaveInfo info2;
info2.set_hostname("localhost");
info2.mutable_id()->CopyFrom(reachable2);
SlaveInfo info3;
info3.set_hostname("localhost");
info3.mutable_id()->CopyFrom(unreachable1);
SlaveInfo info4;
info4.set_hostname("localhost");
info4.mutable_id()->CopyFrom(unreachable2);
{
// Prepare the registrar.
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
// Add all the agents.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(info1))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(info2))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(info3))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(info4))));
// Mark two agents as unreachable.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveUnreachable(info3, protobuf::getCurrentTime()))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveUnreachable(info4, protobuf::getCurrentTime()))));
// Two reachable deactivated agents.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new DeactivateAgent(reachable1))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new DeactivateAgent(reachable2))));
}
{
// Check for two deactivated agents and the minimum capability.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
ASSERT_EQ(2, registry->slaves().slaves().size());
ASSERT_EQ(2, registry->unreachable().slaves().size());
EXPECT_TRUE(registry->slaves().slaves(0).deactivated());
EXPECT_TRUE(registry->slaves().slaves(1).deactivated());
ASSERT_EQ(1, registry->minimum_capabilities().size());
EXPECT_EQ(
MasterInfo_Capability_Type_Name(MasterInfo::Capability::AGENT_DRAINING),
registry->minimum_capabilities(0).capability());
// Reactivate one agent.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new ReactivateAgent(reachable1))));
}
{
// Check for one deactivated agent and
// that the minimum capability is still present.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
ASSERT_EQ(2, registry->slaves().slaves().size());
ASSERT_EQ(2, registry->unreachable().slaves().size());
EXPECT_FALSE(registry->slaves().slaves(0).deactivated());
EXPECT_TRUE(registry->slaves().slaves(1).deactivated());
ASSERT_EQ(1, registry->minimum_capabilities().size());
EXPECT_EQ(
MasterInfo_Capability_Type_Name(MasterInfo::Capability::AGENT_DRAINING),
registry->minimum_capabilities(0).capability());
// Reactivate the other agent.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new ReactivateAgent(reachable2))));
}
{
// Check for one deactivated agent and
// that the minimum capability is still present.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
ASSERT_EQ(2, registry->slaves().slaves().size());
ASSERT_EQ(2, registry->unreachable().slaves().size());
EXPECT_FALSE(registry->slaves().slaves(0).deactivated());
EXPECT_FALSE(registry->slaves().slaves(1).deactivated());
ASSERT_EQ(0, registry->minimum_capabilities().size());
// Two unreachable deactivated agents.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new DeactivateAgent(unreachable1))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new DeactivateAgent(unreachable2))));
// Try reactivating an agent that is already active.
// This should not result in a change.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new ReactivateAgent(reachable1))));
}
{
// Again, check for two deactivated agents and the minimum capability.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
ASSERT_EQ(2, registry->slaves().slaves().size());
ASSERT_EQ(2, registry->unreachable().slaves().size());
EXPECT_TRUE(registry->unreachable().slaves(0).deactivated());
EXPECT_TRUE(registry->unreachable().slaves(1).deactivated());
ASSERT_EQ(1, registry->minimum_capabilities().size());
EXPECT_EQ(
MasterInfo_Capability_Type_Name(MasterInfo::Capability::AGENT_DRAINING),
registry->minimum_capabilities(0).capability());
// Reactivate one. This time, in the opposite order.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new ReactivateAgent(unreachable2))));
}
{
// Should be one deactivated agent, with minimum capability still present.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
ASSERT_EQ(2, registry->slaves().slaves().size());
ASSERT_EQ(2, registry->unreachable().slaves().size());
EXPECT_TRUE(registry->unreachable().slaves(0).deactivated());
EXPECT_FALSE(registry->unreachable().slaves(1).deactivated());
ASSERT_EQ(1, registry->minimum_capabilities().size());
EXPECT_EQ(
MasterInfo_Capability_Type_Name(MasterInfo::Capability::AGENT_DRAINING),
registry->minimum_capabilities(0).capability());
// Reactivate the other one.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new ReactivateAgent(unreachable1))));
}
{
// No deactivated agents, with no minimum capability.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
ASSERT_EQ(2, registry->slaves().slaves().size());
ASSERT_EQ(2, registry->unreachable().slaves().size());
EXPECT_FALSE(registry->unreachable().slaves(0).deactivated());
EXPECT_FALSE(registry->unreachable().slaves(1).deactivated());
ASSERT_EQ(0, registry->minimum_capabilities().size());
// Now try deactivate reachable and unreachable together.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new DeactivateAgent(reachable1))));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new DeactivateAgent(unreachable1))));
// We'll skip a validation step here and reactivate one right away.
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new ReactivateAgent(reachable1))));
}
{
// Minimum capability should not be removed if an unreachable agent
// is deactivated.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
ASSERT_EQ(2, registry->slaves().slaves().size());
ASSERT_EQ(2, registry->unreachable().slaves().size());
EXPECT_FALSE(registry->slaves().slaves(0).deactivated());
EXPECT_TRUE(registry->unreachable().slaves(0).deactivated());
ASSERT_EQ(1, registry->minimum_capabilities().size());
EXPECT_EQ(
MasterInfo_Capability_Type_Name(MasterInfo::Capability::AGENT_DRAINING),
registry->minimum_capabilities(0).capability());
}
}
// Tests that adding and updating quotas in the registry works properly.
TEST_F(RegistrarTest, UpdateQuota)
{
// Helper to construct `QuotaConfig`.
auto createQuotaConfig = [](const string& role,
const string& quantitiesString,
const string& limitsString) {
QuotaConfig config;
config.set_role(role);
google::protobuf::Map<string, Value::Scalar> guarantees_;
ResourceQuantities quantities =
CHECK_NOTERROR(ResourceQuantities::fromString(quantitiesString));
foreachpair (const string& name, const Value::Scalar& scalar, quantities) {
guarantees_[name] = scalar;
}
google::protobuf::Map<string, Value::Scalar> limits_;
ResourceLimits limits =
CHECK_NOTERROR(ResourceLimits::fromString(limitsString));
foreachpair (const string& name, const Value::Scalar& scalar, limits) {
limits_[name] = scalar;
}
*config.mutable_guarantees() = std::move(guarantees_);
*config.mutable_limits() = std::move(limits_);
return config;
};
RepeatedPtrField<QuotaConfig> configs;
{
// Initially no quota and minimum capabilities are recorded.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
EXPECT_EQ(0, registry->quota_configs().size());
EXPECT_EQ(0, registry->minimum_capabilities().size());
// Store quota for a role with default quota.
*configs.Add() = createQuotaConfig("role1", "", "");
AWAIT_TRUE(
registrar.apply(Owned<RegistryOperation>(new UpdateQuota(configs))));
}
{
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
// Default quota is not persisted into the registry.
EXPECT_EQ(0, registry->quota_configs().size());
EXPECT_EQ(0, registry->minimum_capabilities().size());
// Update quota for `role1`.
configs.Clear();
*configs.Add() =
createQuotaConfig("role1", "cpus:1;mem:1024", "cpus:2;mem:2048");
AWAIT_TRUE(
registrar.apply(Owned<RegistryOperation>(new UpdateQuota(configs))));
}
{
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
EXPECT_EQ(1, registry->quota_configs().size());
Try<JSON::Array> expected = JSON::parse<JSON::Array>(
R"~(
[
{
"guarantees": {
"cpus": {
"value": 1
},
"mem": {
"value": 1024
}
},
"limits": {
"cpus": {
"value": 2
},
"mem": {
"value": 2048
}
},
"role": "role1"
}
])~");
EXPECT_EQ(
CHECK_NOTERROR(expected), JSON::protobuf(registry->quota_configs()));
// The `QUOTA_V2` capability is added to the registry.
//
// TODO(mzhu): This assumes the the registry starts empty which might not
// be in the future. Just check the presence of `QUOTA_V2`.
EXPECT_EQ(1, registry->minimum_capabilities().size());
EXPECT_EQ("QUOTA_V2", registry->minimum_capabilities(0).capability());
// Update quota for "role2".
configs.Clear();
*configs.Add() =
createQuotaConfig("role2", "cpus:1;mem:1024", "cpus:2;mem:2048");
AWAIT_TRUE(
registrar.apply(Owned<RegistryOperation>(new UpdateQuota(configs))));
}
{
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
// Check that the recovered quotas match those we stored previously.
// NOTE: We assume quota messages are stored in order they have
// been added.
// TODO(alexr): Consider removing dependency on the order.
Try<JSON::Array> expected = JSON::parse<JSON::Array>(
R"~(
[
{
"guarantees": {
"cpus": {
"value": 1
},
"mem": {
"value": 1024
}
},
"limits": {
"cpus": {
"value": 2
},
"mem": {
"value": 2048
}
},
"role": "role1"
},
{
"guarantees": {
"cpus": {
"value": 1
},
"mem": {
"value": 1024
}
},
"limits": {
"cpus": {
"value": 2
},
"mem": {
"value": 2048
}
},
"role": "role2"
}
])~");
EXPECT_EQ(
CHECK_NOTERROR(expected), JSON::protobuf(registry->quota_configs()));
// TODO(mzhu): This assumes the the registry starts empty which might not
// be in the future. Just check the presence of `QUOTA_V2`.
EXPECT_EQ(1, registry->minimum_capabilities().size());
EXPECT_EQ("QUOTA_V2", registry->minimum_capabilities(0).capability());
// Change quota for "role1"` and "role2"` in a single call.
configs.Clear();
*configs.Add() = createQuotaConfig("role1", "cpus:2", "cpus:4");
*configs.Add() = createQuotaConfig("role2", "cpus:2", "cpus:4");
AWAIT_TRUE(
registrar.apply(Owned<RegistryOperation>(new UpdateQuota(configs))));
}
{
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
// Check that the recovered quotas match those we stored previously.
// NOTE: We assume quota messages are stored in order they have
// been added.
// TODO(alexr): Consider removing dependency on the order.
Try<JSON::Array> expected = JSON::parse<JSON::Array>(
R"~(
[
{
"guarantees": {
"cpus": {
"value": 2
}
},
"limits": {
"cpus": {
"value": 4
}
},
"role": "role1"
},
{
"guarantees": {
"cpus": {
"value": 2
}
},
"limits": {
"cpus": {
"value": 4
}
},
"role": "role2"
}
])~");
EXPECT_EQ(
CHECK_NOTERROR(expected), JSON::protobuf(registry->quota_configs()));
// TODO(mzhu): This assumes the the registry starts empty which might not
// be in the future. Just check the presence of `QUOTA_V2`.
EXPECT_EQ(1, registry->minimum_capabilities().size());
EXPECT_EQ("QUOTA_V2", registry->minimum_capabilities(0).capability());
// Reset "role2"` quota to default.
configs.Clear();
*configs.Add() = createQuotaConfig("role2", "", "");
AWAIT_TRUE(
registrar.apply(Owned<RegistryOperation>(new UpdateQuota(configs))));
}
{
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
configs.Clear();
*configs.Add() = createQuotaConfig("role1", "cpus:2", "cpus:4");
Try<JSON::Array> expected = JSON::parse<JSON::Array>(
R"~(
[
{
"guarantees": {
"cpus": {
"value": 2
}
},
"limits": {
"cpus": {
"value": 4
}
},
"role": "role1"
}
])~");
EXPECT_EQ(
CHECK_NOTERROR(expected), JSON::protobuf(registry->quota_configs()));
// TODO(mzhu): This assumes the the registry starts empty which might not
// be in the future. Just check the presence of `QUOTA_V2`.
EXPECT_EQ(1, registry->minimum_capabilities().size());
EXPECT_EQ("QUOTA_V2", registry->minimum_capabilities(0).capability());
// Reset "role1"` quota to default.
configs.Clear();
*configs.Add() = createQuotaConfig("role1", "", "");
AWAIT_TRUE(
registrar.apply(Owned<RegistryOperation>(new UpdateQuota(configs))));
}
{
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
EXPECT_EQ(0, registry->quota_configs().size());
// The `QUOTA_V2` capability is removed because `quota_configs` is empty.
EXPECT_EQ(0, registry->minimum_capabilities().size());
}
}
// Tests that updating quota with an invalid config fails.
TEST_F(RegistrarTest, UpdateQuotaInvalid)
{
QuotaConfig config;
config.set_role("role1");
auto resourceMap = [](const vector<pair<string, double>>& vector)
-> Map<string, Value::Scalar> {
Map<string, Value::Scalar> result;
foreachpair (const string& name, double value, vector) {
Value::Scalar scalar;
scalar.set_value(value);
result[name] = scalar;
}
return result;
};
// The quota endpoint only allows memory / disk up to
// 1 exabyte (in megabytes) or 1 trillion cores/ports/other.
// For this test, we just check 1 invalid case via mem.
double largestMegabytes = 1024.0 * 1024.0 * 1024.0 * 1024.0;
*config.mutable_limits() = resourceMap({{"mem", largestMegabytes + 1.0}});
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
EXPECT_EQ(0, registry->quota_configs().size());
EXPECT_EQ(0, registry->minimum_capabilities().size());
// Store quota for a role with default quota.
RepeatedPtrField<QuotaConfig> configs;
*configs.Add() = config;
AWAIT_FALSE(
registrar.apply(Owned<RegistryOperation>(new UpdateQuota(configs))));
}
// Tests that updating weights in the registry works properly.
TEST_F(RegistrarTest, UpdateWeights)
{
const string ROLE1 = "role1";
double WEIGHT1 = 2.0;
double UPDATED_WEIGHT1 = 1.0;
const string ROLE2 = "role2";
double WEIGHT2 = 3.5;
{
// Prepare the registrar.
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
ASSERT_TRUE(registry->weights().empty());
// Store the weight for 'ROLE1' previously without weight.
hashmap<string, double> weights;
weights[ROLE1] = WEIGHT1;
vector<WeightInfo> weightInfos = getWeightInfos(weights);
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new UpdateWeights(weightInfos))));
}
{
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
// Check that the recovered weights matches the weights we stored
// previously.
ASSERT_EQ(1, registry->weights_size());
EXPECT_EQ(ROLE1, registry->weights(0).info().role());
ASSERT_EQ(WEIGHT1, registry->weights(0).info().weight());
// Change weight for 'ROLE1', and store the weight for 'ROLE2'.
hashmap<string, double> weights;
weights[ROLE1] = UPDATED_WEIGHT1;
weights[ROLE2] = WEIGHT2;
vector<WeightInfo> weightInfos = getWeightInfos(weights);
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new UpdateWeights(weightInfos))));
}
{
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
// Check that the recovered weights matches the weights we updated
// previously.
ASSERT_EQ(2, registry->weights_size());
EXPECT_EQ(ROLE1, registry->weights(0).info().role());
ASSERT_EQ(UPDATED_WEIGHT1, registry->weights(0).info().weight());
EXPECT_EQ(ROLE2, registry->weights(1).info().role());
ASSERT_EQ(WEIGHT2, registry->weights(1).info().weight());
}
}
TEST_F(RegistrarTest, Bootstrap)
{
// Run 1 simulates the reregistration of a slave that is not present
// in the registry.
{
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
new MarkSlaveReachable(slave))));
}
// Run 2 should see the slave.
{
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
ASSERT_EQ(1, registry->slaves().slaves().size());
EXPECT_EQ(slave, registry->slaves().slaves(0).info());
}
}
class MockStorage : public Storage
{
public:
MOCK_METHOD1(get, Future<Option<Entry>>(const string&));
MOCK_METHOD2(set, Future<bool>(const Entry&, const id::UUID&));
MOCK_METHOD1(expunge, Future<bool>(const Entry&));
MOCK_METHOD0(names, Future<std::set<string>>());
};
TEST_F(RegistrarTest, FetchTimeout)
{
Clock::pause();
MockStorage storage;
State state(&storage);
Future<Nothing> get;
EXPECT_CALL(storage, get(_))
.WillOnce(DoAll(FutureSatisfy(&get),
Return(Future<Option<Entry>>())));
Registrar registrar(flags, &state);
Future<Registry> recover = registrar.recover(master);
AWAIT_READY(get);
Clock::advance(flags.registry_fetch_timeout);
AWAIT_FAILED(recover);
Clock::resume();
// Ensure the registrar fails subsequent operations.
AWAIT_FAILED(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(slave))));
}
TEST_F(RegistrarTest, StoreTimeout)
{
Clock::pause();
MockStorage storage;
State state(&storage);
Registrar registrar(flags, &state);
EXPECT_CALL(storage, get(_))
.WillOnce(Return(None()));
Future<Nothing> set;
EXPECT_CALL(storage, set(_, _))
.WillOnce(DoAll(FutureSatisfy(&set),
Return(Future<bool>())));
Future<Registry> recover = registrar.recover(master);
AWAIT_READY(set);
Clock::advance(flags.registry_store_timeout);
AWAIT_FAILED(recover);
Clock::resume();
// Ensure the registrar fails subsequent operations.
AWAIT_FAILED(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(slave))));
}
TEST_F(RegistrarTest, Abort)
{
MockStorage storage;
State state(&storage);
Registrar registrar(flags, &state);
EXPECT_CALL(storage, get(_))
.WillOnce(Return(None()));
EXPECT_CALL(storage, set(_, _))
.WillOnce(Return(Future<bool>(true))) // Recovery.
.WillOnce(Return(Future<bool>::failed("failure"))) // Failure.
.WillRepeatedly(Return(Future<bool>(true))); // Success.
AWAIT_READY(registrar.recover(master));
// Storage failure.
AWAIT_FAILED(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(slave))));
// The registrar should now be aborted!
AWAIT_FAILED(registrar.apply(Owned<RegistryOperation>(
new AdmitSlave(slave))));
}
// Tests that requests to the '/registry' endpoint are authenticated when HTTP
// authentication is enabled.
TEST_F(RegistrarTest, Authentication)
{
const string AUTHENTICATION_REALM = "realm";
Credentials credentials;
credentials.add_credentials()->CopyFrom(DEFAULT_CREDENTIAL);
Try<authentication::Authenticator*> authenticator =
BasicAuthenticatorFactory::create(AUTHENTICATION_REALM, credentials);
ASSERT_SOME(authenticator);
AWAIT_READY(authentication::setAuthenticator(
AUTHENTICATION_REALM,
Owned<authentication::Authenticator>(authenticator.get())));
Registrar registrar(flags, state, AUTHENTICATION_REALM);
// Requests without credentials should be rejected.
Future<Response> response = process::http::get(registrar.pid(), "registry");
AWAIT_EXPECT_RESPONSE_STATUS_EQ(Unauthorized({}).status, response);
Credential badCredential;
badCredential.set_principal("bad-principal");
badCredential.set_secret("bad-secret");
// Requests with bad credentials should be rejected.
response = process::http::get(
registrar.pid(),
"registry",
None(),
createBasicAuthHeaders(badCredential));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(Unauthorized({}).status, response);
// Requests with good credentials should be permitted.
response = process::http::get(
registrar.pid(),
"registry",
None(),
createBasicAuthHeaders(DEFAULT_CREDENTIAL));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
AWAIT_READY(authentication::unsetAuthenticator(AUTHENTICATION_REALM));
}
class Registrar_BENCHMARK_Test
: public RegistrarTestBase,
public WithParamInterface<size_t> {};
// The Registrar benchmark tests are parameterized by the number of slaves.
INSTANTIATE_TEST_CASE_P(
SlaveCount,
Registrar_BENCHMARK_Test,
::testing::Values(10000U, 20000U, 30000U, 50000U));
TEST_P(Registrar_BENCHMARK_Test, Performance)
{
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
Attributes attributes = Attributes::parse("foo:bar;baz:quux");
Resources resources =
Resources::parse("cpus(*):1.0;mem(*):512;disk(*):2048").get();
size_t slaveCount = GetParam();
// Create slaves.
vector<SlaveInfo> infos;
for (size_t i = 0; i < slaveCount; ++i) {
// Simulate real slave information.
SlaveInfo info;
info.set_hostname("localhost");
info.mutable_id()->set_value(
string("201310101658-2280333834-5050-48574-") + stringify(i));
info.mutable_resources()->MergeFrom(resources);
info.mutable_attributes()->MergeFrom(attributes);
infos.push_back(info);
}
// Admit slaves.
Stopwatch watch;
watch.start();
Future<bool> result;
foreach (const SlaveInfo& info, infos) {
result = registrar.apply(Owned<RegistryOperation>(new AdmitSlave(info)));
}
AWAIT_READY_FOR(result, Minutes(5));
LOG(INFO) << "Admitted " << slaveCount << " agents in " << watch.elapsed();
// Shuffle the slaves so we are readmitting them in random order (
// same as in production).
std::random_shuffle(infos.begin(), infos.end());
// Mark slaves reachable again. This simulates the master failing
// over, and then the previously registered agents reregistering
// with the new master.
watch.start();
foreach (const SlaveInfo& info, infos) {
result = registrar.apply(Owned<RegistryOperation>(
new MarkSlaveReachable(info)));
}
AWAIT_READY_FOR(result, Minutes(5));
LOG(INFO) << "Marked " << slaveCount
<< " agents reachable in " << watch.elapsed();
// Recover slaves.
Registrar registrar2(flags, state);
watch.start();
MasterInfo info;
info.set_id("master");
info.set_ip(10000000);
info.set_port(5050);
Future<Registry> registry = registrar2.recover(info);
AWAIT_READY(registry);
LOG(INFO) << "Recovered " << slaveCount << " agents ("
<< Bytes(registry->ByteSize()) << ") in " << watch.elapsed();
// Shuffle the slaves so we are removing them in random order (same
// as in production).
std::random_shuffle(infos.begin(), infos.end());
// Remove slaves.
watch.start();
foreach (const SlaveInfo& info, infos) {
result = registrar2.apply(Owned<RegistryOperation>(new RemoveSlave(info)));
}
AWAIT_READY_FOR(result, Minutes(5));
cout << "Removed " << slaveCount << " agents in " << watch.elapsed() << endl;
}
// Test the performance of marking all registered slaves unreachable,
// then marking them reachable again. This might occur if there is a
// network partition and then the partition heals.
TEST_P(Registrar_BENCHMARK_Test, MarkUnreachableThenReachable)
{
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
Attributes attributes = Attributes::parse("foo:bar;baz:quux");
Resources resources =
Resources::parse("cpus(*):1.0;mem(*):512;disk(*):2048").get();
size_t slaveCount = GetParam();
// Create slaves.
vector<SlaveInfo> infos;
for (size_t i = 0; i < slaveCount; ++i) {
// Simulate real slave information.
SlaveInfo info;
info.set_hostname("localhost");
info.mutable_id()->set_value(
string("201310101658-2280333834-5050-48574-") + stringify(i));
info.mutable_resources()->MergeFrom(resources);
info.mutable_attributes()->MergeFrom(attributes);
infos.push_back(info);
}
// Admit slaves.
Stopwatch watch;
watch.start();
Future<bool> result;
foreach (const SlaveInfo& info, infos) {
result = registrar.apply(Owned<RegistryOperation>(new AdmitSlave(info)));
}
AWAIT_READY_FOR(result, Minutes(5));
LOG(INFO) << "Admitted " << slaveCount << " agents in " << watch.elapsed();
// Shuffle the slaves so that we mark them unreachable in random
// order (same as in production).
std::random_shuffle(infos.begin(), infos.end());
// Mark all slaves unreachable.
TimeInfo unreachableTime = protobuf::getCurrentTime();
watch.start();
foreach (const SlaveInfo& info, infos) {
result = registrar.apply(Owned<RegistryOperation>(
new MarkSlaveUnreachable(info, unreachableTime)));
}
AWAIT_READY_FOR(result, Minutes(5));
cout << "Marked " << slaveCount << " agents unreachable in "
<< watch.elapsed() << endl;
// Shuffles the slaves again so that we mark them reachable in
// random order (same as in production).
std::random_shuffle(infos.begin(), infos.end());
// Mark all slaves reachable.
watch.start();
foreach (const SlaveInfo& info, infos) {
result = registrar.apply(Owned<RegistryOperation>(
new MarkSlaveReachable(info)));
}
AWAIT_READY_FOR(result, Minutes(5));
cout << "Marked " << slaveCount << " agents reachable in "
<< watch.elapsed() << endl;
}
// Test the performance of garbage collecting a large portion of the
// unreachable list in a single operation. We use a fixed percentage
// at the moment (50%).
TEST_P(Registrar_BENCHMARK_Test, GcManyAgents)
{
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
Attributes attributes = Attributes::parse("foo:bar;baz:quux");
Resources resources =
Resources::parse("cpus(*):1.0;mem(*):512;disk(*):2048").get();
size_t slaveCount = GetParam();
// Create slaves.
vector<SlaveInfo> infos;
for (size_t i = 0; i < slaveCount; ++i) {
// Simulate real slave information.
SlaveInfo info;
info.set_hostname("localhost");
info.mutable_id()->set_value(
string("201310101658-2280333834-5050-48574-") + stringify(i));
info.mutable_resources()->MergeFrom(resources);
info.mutable_attributes()->MergeFrom(attributes);
infos.push_back(info);
}
// Admit slaves.
Stopwatch watch;
watch.start();
Future<bool> result;
foreach (const SlaveInfo& info, infos) {
result = registrar.apply(Owned<RegistryOperation>(new AdmitSlave(info)));
}
AWAIT_READY_FOR(result, Minutes(5));
LOG(INFO) << "Admitted " << slaveCount << " agents in " << watch.elapsed();
// Shuffle the slaves so that we mark them unreachable in random
// order (same as in production).
std::random_shuffle(infos.begin(), infos.end());
// Mark all slaves unreachable.
TimeInfo unreachableTime = protobuf::getCurrentTime();
watch.start();
foreach (const SlaveInfo& info, infos) {
result = registrar.apply(Owned<RegistryOperation>(
new MarkSlaveUnreachable(info, unreachableTime)));
}
AWAIT_READY_FOR(result, Minutes(5));
LOG(INFO) << "Marked " << slaveCount << " agents unreachable in "
<< watch.elapsed() << endl;
// Prepare to GC the first half of the unreachable list.
hashset<SlaveID> toRemove;
for (size_t i = 0; (i * 2) < slaveCount; i++) {
const SlaveInfo& info = infos[i];
toRemove.insert(info.id());
}
// Do GC.
watch.start();
result = registrar.apply(Owned<RegistryOperation>(
new Prune(toRemove, hashset<SlaveID>())));
AWAIT_READY_FOR(result, Minutes(5));
cout << "Garbage collected " << toRemove.size() << " agents in "
<< watch.elapsed() << endl;
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {