| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include <algorithm> |
| #include <iostream> |
| #include <map> |
| #include <set> |
| #include <string> |
| #include <vector> |
| |
| #include <mesos/attributes.hpp> |
| #include <mesos/type_utils.hpp> |
| |
| #include <mesos/authentication/http/basic_authenticator_factory.hpp> |
| |
| #include <mesos/log/log.hpp> |
| |
| #include <mesos/state/log.hpp> |
| #include <mesos/state/state.hpp> |
| #include <mesos/state/storage.hpp> |
| |
| #include <process/clock.hpp> |
| #include <process/gmock.hpp> |
| #include <process/gtest.hpp> |
| #include <process/pid.hpp> |
| #include <process/process.hpp> |
| |
| #include <stout/bytes.hpp> |
| #include <stout/stopwatch.hpp> |
| #include <stout/uuid.hpp> |
| |
| #include <stout/tests/utils.hpp> |
| |
| #include "common/protobuf_utils.hpp" |
| |
| #include "log/replica.hpp" |
| |
| #include "log/tool/initialize.hpp" |
| |
| #include "master/flags.hpp" |
| #include "master/maintenance.hpp" |
| #include "master/master.hpp" |
| #include "master/quota.hpp" |
| #include "master/registrar.hpp" |
| #include "master/registry_operations.hpp" |
| #include "master/weights.hpp" |
| |
| #include "tests/mesos.hpp" |
| |
| using namespace mesos::internal::master; |
| |
| using namespace process; |
| |
| using mesos::log::Log; |
| |
| using mesos::internal::log::Replica; |
| |
| using std::cout; |
| using std::endl; |
| using std::map; |
| using std::set; |
| using std::string; |
| using std::vector; |
| |
| using process::Clock; |
| using process::Owned; |
| |
| using process::http::OK; |
| using process::http::Response; |
| using process::http::Unauthorized; |
| |
| using google::protobuf::RepeatedPtrField; |
| |
| using mesos::internal::protobuf::maintenance::createMachineList; |
| using mesos::internal::protobuf::maintenance::createSchedule; |
| using mesos::internal::protobuf::maintenance::createUnavailability; |
| using mesos::internal::protobuf::maintenance::createWindow; |
| |
| using testing::_; |
| using testing::DoAll; |
| using testing::Eq; |
| using testing::Return; |
| |
| using ::testing::WithParamInterface; |
| |
| namespace mesos { |
| namespace internal { |
| namespace tests { |
| |
| namespace quota = mesos::internal::master::quota; |
| |
| namespace authentication = process::http::authentication; |
| |
| using namespace mesos::maintenance; |
| using namespace mesos::quota; |
| |
| using namespace mesos::internal::master::maintenance; |
| using namespace mesos::internal::master::quota; |
| using namespace mesos::internal::master::weights; |
| |
| using mesos::http::authentication::BasicAuthenticatorFactory; |
| |
| using mesos::state::LogStorage; |
| using mesos::state::State; |
| using mesos::state::Storage; |
| |
| using state::Entry; |
| |
| |
| static vector<WeightInfo> getWeightInfos( |
| const hashmap<string, double>& weights) { |
| vector<WeightInfo> weightInfos; |
| |
| foreachpair (const string& role, double weight, weights) { |
| WeightInfo weightInfo; |
| weightInfo.set_role(role); |
| weightInfo.set_weight(weight); |
| weightInfos.push_back(weightInfo); |
| } |
| |
| return weightInfos; |
| } |
| |
| |
| // TODO(xujyan): This class copies code from LogStateTest. It would |
| // be nice to find a common location for log related base tests when |
| // there are more uses of it. |
| class RegistrarTestBase : public TemporaryDirectoryTest |
| { |
| public: |
| RegistrarTestBase() |
| : log(nullptr), |
| storage(nullptr), |
| state(nullptr), |
| replica2(nullptr) {} |
| |
| protected: |
| void SetUp() override |
| { |
| TemporaryDirectoryTest::SetUp(); |
| |
| // For initializing the replicas. |
| log::tool::Initialize initializer; |
| |
| string path1 = os::getcwd() + "/.log1"; |
| string path2 = os::getcwd() + "/.log2"; |
| |
| initializer.flags.path = path1; |
| initializer.execute(); |
| |
| initializer.flags.path = path2; |
| initializer.execute(); |
| |
| // Only create the replica for 'path2' (i.e., the second replica) |
| // as the first replica will be created when we create a Log. |
| replica2 = new Replica(path2); |
| |
| set<UPID> pids; |
| pids.insert(replica2->pid()); |
| |
| log = new Log(2, path1, pids); |
| storage = new LogStorage(log); |
| state = new State(storage); |
| |
| // Compensate for slow CI machines / VMs. |
| flags.registry_store_timeout = process::TEST_AWAIT_TIMEOUT; |
| |
| master.CopyFrom(protobuf::createMasterInfo(UPID("master@127.0.0.1:5050"))); |
| |
| SlaveID id; |
| id.set_value("1"); |
| |
| SlaveInfo info; |
| info.set_hostname("localhost"); |
| info.mutable_id()->CopyFrom(id); |
| |
| slave.CopyFrom(info); |
| } |
| |
| void TearDown() override |
| { |
| delete state; |
| delete storage; |
| delete log; |
| delete replica2; |
| |
| TemporaryDirectoryTest::TearDown(); |
| } |
| |
| Log* log; |
| Storage* storage; |
| State* state; |
| |
| Replica* replica2; |
| |
| MasterInfo master; |
| SlaveInfo slave; |
| Flags flags; |
| }; |
| |
| |
| class RegistrarTest : public RegistrarTestBase {}; |
| |
| |
| TEST_F(RegistrarTest, Recover) |
| { |
| Registrar registrar(flags, state); |
| |
| // Operations preceding recovery will fail. |
| AWAIT_EXPECT_FAILED(registrar.apply(Owned<RegistryOperation>( |
| new AdmitSlave(slave)))); |
| AWAIT_EXPECT_FAILED(registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveUnreachable(slave, protobuf::getCurrentTime())))); |
| AWAIT_EXPECT_FAILED(registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveReachable(slave)))); |
| AWAIT_EXPECT_FAILED(registrar.apply(Owned<RegistryOperation>( |
| new RemoveSlave(slave)))); |
| |
| Future<Registry> registry = registrar.recover(master); |
| |
| // Before waiting for the recovery to complete, invoke some |
| // operations to ensure they do not fail. |
| Future<bool> admit = registrar.apply(Owned<RegistryOperation>( |
| new AdmitSlave(slave))); |
| Future<bool> unreachable = registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveUnreachable(slave, protobuf::getCurrentTime()))); |
| Future<bool> reachable = registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveReachable(slave))); |
| Future<bool> remove = registrar.apply(Owned<RegistryOperation>( |
| new RemoveSlave(slave))); |
| |
| AWAIT_READY(registry); |
| EXPECT_EQ(master, registry->master().info()); |
| |
| AWAIT_TRUE(admit); |
| AWAIT_TRUE(unreachable); |
| AWAIT_TRUE(reachable); |
| AWAIT_TRUE(remove); |
| } |
| |
| |
| TEST_F(RegistrarTest, Admit) |
| { |
| Registrar registrar(flags, state); |
| AWAIT_READY(registrar.recover(master)); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(new AdmitSlave(slave)))); |
| AWAIT_FALSE(registrar.apply(Owned<RegistryOperation>(new AdmitSlave(slave)))); |
| } |
| |
| |
| TEST_F(RegistrarTest, UpdateSlave) |
| { |
| // Add a new slave to the registry. |
| { |
| Registrar registrar(flags, state); |
| AWAIT_READY(registrar.recover(master)); |
| |
| slave.set_hostname("original"); |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new AdmitSlave(slave)))); |
| } |
| |
| |
| // Verify that the slave is present, and update its hostname. |
| { |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| ASSERT_EQ(1, registry->slaves().slaves().size()); |
| EXPECT_EQ("original", registry->slaves().slaves(0).info().hostname()); |
| |
| slave.set_hostname("changed"); |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new UpdateSlave(slave)))); |
| } |
| |
| // Verify that the hostname indeed changed, and do one additional update |
| // to check that the operation is idempotent. |
| { |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| ASSERT_EQ(1, registry->slaves().slaves().size()); |
| EXPECT_EQ("changed", registry->slaves().slaves(0).info().hostname()); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new UpdateSlave(slave)))); |
| } |
| |
| // Verify that nothing changed from the second update. |
| { |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| ASSERT_EQ(1, registry->slaves().slaves().size()); |
| EXPECT_EQ("changed", registry->slaves().slaves(0).info().hostname()); |
| } |
| } |
| |
| |
| TEST_F(RegistrarTest, MarkReachable) |
| { |
| Registrar registrar(flags, state); |
| AWAIT_READY(registrar.recover(master)); |
| |
| SlaveID id1; |
| id1.set_value("1"); |
| |
| SlaveInfo info1; |
| info1.set_hostname("localhost"); |
| info1.mutable_id()->CopyFrom(id1); |
| |
| SlaveID id2; |
| id2.set_value("2"); |
| |
| SlaveInfo info2; |
| info2.set_hostname("localhost"); |
| info2.mutable_id()->CopyFrom(id2); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new AdmitSlave(info1)))); |
| |
| // Attempting to mark a slave as reachable that is already reachable |
| // should not result in an error. |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveReachable(info1)))); |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveReachable(info1)))); |
| |
| // Attempting to mark a slave as reachable that is not in the |
| // unreachable list should not result in error. |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveReachable(info2)))); |
| } |
| |
| |
| TEST_F(RegistrarTest, MarkUnreachable) |
| { |
| Registrar registrar(flags, state); |
| AWAIT_READY(registrar.recover(master)); |
| |
| SlaveID id1; |
| id1.set_value("1"); |
| |
| SlaveInfo info1; |
| info1.set_hostname("localhost"); |
| info1.mutable_id()->CopyFrom(id1); |
| |
| SlaveID id2; |
| id2.set_value("2"); |
| |
| SlaveInfo info2; |
| info2.set_hostname("localhost"); |
| info2.mutable_id()->CopyFrom(id2); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new AdmitSlave(info1)))); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveUnreachable(info1, protobuf::getCurrentTime())))); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveReachable(info1)))); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveUnreachable(info1, protobuf::getCurrentTime())))); |
| |
| // If a slave is already unreachable, trying to mark it unreachable |
| // again should fail. |
| AWAIT_FALSE(registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveUnreachable(info1, protobuf::getCurrentTime())))); |
| } |
| |
| |
| // Verify that an admitted slave can be marked as gone. |
| TEST_F(RegistrarTest, MarkGone) |
| { |
| Registrar registrar(flags, state); |
| AWAIT_READY(registrar.recover(master)); |
| |
| SlaveID id1; |
| id1.set_value("1"); |
| |
| SlaveInfo info1; |
| info1.set_hostname("localhost"); |
| info1.mutable_id()->CopyFrom(id1); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new AdmitSlave(info1)))); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveGone(info1.id(), protobuf::getCurrentTime())))); |
| } |
| |
| |
| // Verify that an unreachable slave can be marked as gone. |
| TEST_F(RegistrarTest, MarkUnreachableGone) |
| { |
| Registrar registrar(flags, state); |
| AWAIT_READY(registrar.recover(master)); |
| |
| SlaveID id1; |
| id1.set_value("1"); |
| |
| SlaveInfo info1; |
| info1.set_hostname("localhost"); |
| info1.mutable_id()->CopyFrom(id1); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new AdmitSlave(info1)))); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveUnreachable(info1, protobuf::getCurrentTime())))); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveGone(info1.id(), protobuf::getCurrentTime())))); |
| |
| // If a slave is already gone, trying to mark it gone again should fail. |
| AWAIT_FALSE(registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveGone(info1.id(), protobuf::getCurrentTime())))); |
| |
| // If a slave is already gone, trying to mark it unreachable |
| // again should fail. |
| AWAIT_FALSE(registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveUnreachable(info1, protobuf::getCurrentTime())))); |
| } |
| |
| |
| TEST_F(RegistrarTest, Prune) |
| { |
| Registrar registrar(flags, state); |
| AWAIT_READY(registrar.recover(master)); |
| |
| SlaveID id1; |
| id1.set_value("1"); |
| |
| SlaveInfo info1; |
| info1.set_hostname("localhost"); |
| info1.mutable_id()->CopyFrom(id1); |
| |
| SlaveID id2; |
| id2.set_value("2"); |
| |
| SlaveInfo info2; |
| info2.set_hostname("localhost"); |
| info2.mutable_id()->CopyFrom(id2); |
| |
| SlaveID id3; |
| id3.set_value("3"); |
| |
| SlaveInfo info3; |
| info3.set_hostname("localhost"); |
| info3.mutable_id()->CopyFrom(id3); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new AdmitSlave(info1)))); |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new AdmitSlave(info2)))); |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new AdmitSlave(info3)))); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveUnreachable(info1, protobuf::getCurrentTime())))); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveUnreachable(info2, protobuf::getCurrentTime())))); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveGone(info3.id(), protobuf::getCurrentTime())))); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new Prune({id1}, {})))); |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new Prune({id2}, {})))); |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new Prune({}, {id3})))); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new AdmitSlave(info1)))); |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new AdmitSlave(info2)))); |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new AdmitSlave(info3)))); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveUnreachable(info1, protobuf::getCurrentTime())))); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveUnreachable(info2, protobuf::getCurrentTime())))); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveGone(info3.id(), protobuf::getCurrentTime())))); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new Prune({id1}, {})))); |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new Prune({id2}, {})))); |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new Prune({}, {id3})))); |
| } |
| |
| |
| TEST_F(RegistrarTest, Remove) |
| { |
| Registrar registrar(flags, state); |
| AWAIT_READY(registrar.recover(master)); |
| |
| SlaveID id1; |
| id1.set_value("1"); |
| |
| SlaveInfo info1; |
| info1.set_hostname("localhost"); |
| info1.mutable_id()->CopyFrom(id1); |
| |
| SlaveID id2; |
| id2.set_value("2"); |
| |
| SlaveInfo info2; |
| info2.set_hostname("localhost"); |
| info2.mutable_id()->CopyFrom(id2); |
| |
| SlaveID id3; |
| id3.set_value("3"); |
| |
| SlaveInfo info3; |
| info3.set_hostname("localhost"); |
| info3.mutable_id()->CopyFrom(id3); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new AdmitSlave(info1)))); |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new AdmitSlave(info2)))); |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new AdmitSlave(info3)))); |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new RemoveSlave(info1)))); |
| AWAIT_FALSE(registrar.apply(Owned<RegistryOperation>( |
| new RemoveSlave(info1)))); |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new AdmitSlave(info1)))); |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new RemoveSlave(info2)))); |
| AWAIT_FALSE(registrar.apply(Owned<RegistryOperation>( |
| new RemoveSlave(info2)))); |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new RemoveSlave(info3)))); |
| AWAIT_FALSE(registrar.apply(Owned<RegistryOperation>( |
| new RemoveSlave(info3)))); |
| } |
| |
| |
| // NOTE: For the following tests, the state of the registrar can |
| // only be viewed once per instantiation of the registrar. |
| // To check the result of each operation, we must re-construct |
| // the registrar, which is done by putting the code into scoped blocks. |
| |
| // TODO(josephw): Consider refactoring these maintenance operation tests |
| // to use a helper function for each un-named scoped block. |
| // For example: |
| // MaintenanceTest(flags, state, [=](const Registry& registry) { |
| // // Checks and operations. i.e.: |
| // EXPECT_EQ(1, registry->schedules().size()); |
| // }); |
| |
| // Adds maintenance schedules to the registry, one machine at a time. |
| // Then removes machines from the schedule. |
| TEST_F(RegistrarTest, UpdateMaintenanceSchedule) |
| { |
| // Machine definitions used in this test. |
| MachineID machine1; |
| machine1.set_ip("0.0.0.1"); |
| |
| MachineID machine2; |
| machine2.set_hostname("2"); |
| |
| MachineID machine3; |
| machine3.set_hostname("3"); |
| machine3.set_ip("0.0.0.3"); |
| |
| Unavailability unavailability = createUnavailability(Clock::now()); |
| |
| { |
| // Prepare the registrar. |
| Registrar registrar(flags, state); |
| AWAIT_READY(registrar.recover(master)); |
| |
| // Schedule one machine for maintenance. |
| maintenance::Schedule schedule = createSchedule( |
| {createWindow({machine1}, unavailability)}); |
| |
| AWAIT_READY(registrar.apply(Owned<RegistryOperation>( |
| new UpdateSchedule(schedule)))); |
| } |
| |
| { |
| // Check that one schedule and one machine info was made. |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| EXPECT_EQ(1, registry->schedules().size()); |
| EXPECT_EQ(1, registry->schedules(0).windows().size()); |
| EXPECT_EQ(1, registry->schedules(0).windows(0).machine_ids().size()); |
| EXPECT_EQ(1, registry->machines().machines().size()); |
| EXPECT_EQ( |
| MachineInfo::DRAINING, |
| registry->machines().machines(0).info().mode()); |
| |
| // Extend the schedule by one machine (in a different window). |
| maintenance::Schedule schedule = createSchedule({ |
| createWindow({machine1}, unavailability), |
| createWindow({machine2}, unavailability)}); |
| |
| AWAIT_READY(registrar.apply(Owned<RegistryOperation>( |
| new UpdateSchedule(schedule)))); |
| } |
| |
| { |
| // Check that both machines are part of maintenance. |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| EXPECT_EQ(1, registry->schedules().size()); |
| EXPECT_EQ(2, registry->schedules(0).windows().size()); |
| EXPECT_EQ(1, registry->schedules(0).windows(0).machine_ids().size()); |
| EXPECT_EQ(1, registry->schedules(0).windows(1).machine_ids().size()); |
| EXPECT_EQ(2, registry->machines().machines().size()); |
| EXPECT_EQ( |
| MachineInfo::DRAINING, |
| registry->machines().machines(1).info().mode()); |
| |
| // Extend a window by one machine. |
| maintenance::Schedule schedule = createSchedule({ |
| createWindow({machine1}, unavailability), |
| createWindow({machine2, machine3}, unavailability)}); |
| |
| AWAIT_READY(registrar.apply(Owned<RegistryOperation>( |
| new UpdateSchedule(schedule)))); |
| } |
| |
| { |
| // Check that all three machines are included. |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| EXPECT_EQ(1, registry->schedules().size()); |
| EXPECT_EQ(2, registry->schedules(0).windows().size()); |
| EXPECT_EQ(1, registry->schedules(0).windows(0).machine_ids().size()); |
| EXPECT_EQ(2, registry->schedules(0).windows(1).machine_ids().size()); |
| EXPECT_EQ(3, registry->machines().machines().size()); |
| EXPECT_EQ( |
| MachineInfo::DRAINING, |
| registry->machines().machines(2).info().mode()); |
| |
| // Rearrange the schedule into one window. |
| maintenance::Schedule schedule = createSchedule( |
| {createWindow({machine1, machine2, machine3}, unavailability)}); |
| |
| AWAIT_READY(registrar.apply(Owned<RegistryOperation>( |
| new UpdateSchedule(schedule)))); |
| } |
| |
| { |
| // Check that the machine infos are unchanged, but the schedule is. |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| EXPECT_EQ(1, registry->schedules().size()); |
| EXPECT_EQ(1, registry->schedules(0).windows().size()); |
| EXPECT_EQ(3, registry->schedules(0).windows(0).machine_ids().size()); |
| EXPECT_EQ(3, registry->machines().machines().size()); |
| EXPECT_EQ( |
| MachineInfo::DRAINING, |
| registry->machines().machines(0).info().mode()); |
| |
| EXPECT_EQ( |
| MachineInfo::DRAINING, |
| registry->machines().machines(1).info().mode()); |
| |
| EXPECT_EQ( |
| MachineInfo::DRAINING, |
| registry->machines().machines(2).info().mode()); |
| |
| // Delete one machine from the schedule. |
| maintenance::Schedule schedule = createSchedule( |
| {createWindow({machine2, machine3}, unavailability)}); |
| |
| AWAIT_READY(registrar.apply(Owned<RegistryOperation>( |
| new UpdateSchedule(schedule)))); |
| } |
| |
| { |
| // Check that one machine info is removed. |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| EXPECT_EQ(1, registry->schedules().size()); |
| EXPECT_EQ(1, registry->schedules(0).windows().size()); |
| EXPECT_EQ(2, registry->schedules(0).windows(0).machine_ids().size()); |
| EXPECT_EQ(2, registry->machines().machines().size()); |
| |
| // Delete all machines from the schedule. |
| maintenance::Schedule schedule; |
| AWAIT_READY(registrar.apply(Owned<RegistryOperation>( |
| new UpdateSchedule(schedule)))); |
| } |
| |
| { |
| // Check that all statuses are removed. |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| EXPECT_EQ(1, registry->schedules().size()); |
| EXPECT_TRUE(registry->schedules(0).windows().empty()); |
| EXPECT_TRUE(registry->machines().machines().empty()); |
| } |
| } |
| |
| |
| // Creates a schedule and properly starts maintenance. |
| TEST_F(RegistrarTest, StartMaintenance) |
| { |
| // Machine definitions used in this test. |
| MachineID machine1; |
| machine1.set_ip("0.0.0.1"); |
| |
| MachineID machine2; |
| machine2.set_hostname("2"); |
| |
| MachineID machine3; |
| machine3.set_hostname("3"); |
| machine3.set_ip("0.0.0.3"); |
| |
| Unavailability unavailability = createUnavailability(Clock::now()); |
| |
| { |
| // Prepare the registrar. |
| Registrar registrar(flags, state); |
| AWAIT_READY(registrar.recover(master)); |
| |
| // Schedule two machines for maintenance. |
| maintenance::Schedule schedule = createSchedule( |
| {createWindow({machine1, machine2}, unavailability)}); |
| |
| AWAIT_READY(registrar.apply(Owned<RegistryOperation>( |
| new UpdateSchedule(schedule)))); |
| |
| // Transition machine two into `DOWN` mode. |
| RepeatedPtrField<MachineID> machines = createMachineList({machine2}); |
| |
| AWAIT_READY(registrar.apply(Owned<RegistryOperation>( |
| new StartMaintenance(machines)))); |
| } |
| |
| { |
| // Check that machine two is down. |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| EXPECT_EQ(2, registry->machines().machines().size()); |
| EXPECT_EQ( |
| MachineInfo::DRAINING, |
| registry->machines().machines(0).info().mode()); |
| |
| EXPECT_EQ( |
| MachineInfo::DOWN, |
| registry->machines().machines(1).info().mode()); |
| |
| // Schedule three machines for maintenance. |
| maintenance::Schedule schedule = createSchedule( |
| {createWindow({machine1, machine2, machine3}, unavailability)}); |
| |
| AWAIT_READY(registrar.apply(Owned<RegistryOperation>( |
| new UpdateSchedule(schedule)))); |
| |
| // Deactivate the two `DRAINING` machines. |
| RepeatedPtrField<MachineID> machines = |
| createMachineList({machine1, machine3}); |
| |
| AWAIT_READY(registrar.apply(Owned<RegistryOperation>( |
| new StartMaintenance(machines)))); |
| } |
| |
| { |
| // Check that all machines are down. |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| EXPECT_EQ(3, registry->machines().machines().size()); |
| EXPECT_EQ( |
| MachineInfo::DOWN, |
| registry->machines().machines(0).info().mode()); |
| |
| EXPECT_EQ( |
| MachineInfo::DOWN, |
| registry->machines().machines(1).info().mode()); |
| |
| EXPECT_EQ( |
| MachineInfo::DOWN, |
| registry->machines().machines(2).info().mode()); |
| } |
| } |
| |
| |
| // Creates a schedule and properly starts and stops maintenance. |
| TEST_F(RegistrarTest, StopMaintenance) |
| { |
| // Machine definitions used in this test. |
| MachineID machine1; |
| machine1.set_ip("0.0.0.1"); |
| |
| MachineID machine2; |
| machine2.set_hostname("2"); |
| |
| MachineID machine3; |
| machine3.set_hostname("3"); |
| machine3.set_ip("0.0.0.3"); |
| |
| Unavailability unavailability = createUnavailability(Clock::now()); |
| |
| { |
| // Prepare the registrar. |
| Registrar registrar(flags, state); |
| AWAIT_READY(registrar.recover(master)); |
| |
| // Schdule three machines for maintenance. |
| maintenance::Schedule schedule = createSchedule({ |
| createWindow({machine1, machine2}, unavailability), |
| createWindow({machine3}, unavailability)}); |
| |
| AWAIT_READY(registrar.apply(Owned<RegistryOperation>( |
| new UpdateSchedule(schedule)))); |
| |
| // Transition machine three into `DOWN` mode. |
| RepeatedPtrField<MachineID> machines = createMachineList({machine3}); |
| |
| AWAIT_READY(registrar.apply(Owned<RegistryOperation>( |
| new StartMaintenance(machines)))); |
| |
| // Transition machine three into `UP` mode. |
| AWAIT_READY(registrar.apply(Owned<RegistryOperation>( |
| new StopMaintenance(machines)))); |
| } |
| |
| { |
| // Check that machine three and the window were removed. |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| EXPECT_EQ(1, registry->schedules().size()); |
| EXPECT_EQ(1, registry->schedules(0).windows().size()); |
| EXPECT_EQ(2, registry->schedules(0).windows(0).machine_ids().size()); |
| EXPECT_EQ(2, registry->machines().machines().size()); |
| EXPECT_EQ( |
| MachineInfo::DRAINING, |
| registry->machines().machines(0).info().mode()); |
| |
| EXPECT_EQ( |
| MachineInfo::DRAINING, |
| registry->machines().machines(1).info().mode()); |
| |
| // Transition machine one and two into `DOWN` mode. |
| RepeatedPtrField<MachineID> machines = |
| createMachineList({machine1, machine2}); |
| |
| AWAIT_READY(registrar.apply(Owned<RegistryOperation>( |
| new StartMaintenance(machines)))); |
| |
| // Transition all machines into `UP` mode. |
| AWAIT_READY(registrar.apply(Owned<RegistryOperation>( |
| new StopMaintenance(machines)))); |
| } |
| |
| { |
| // Check that the schedule is now empty. |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| EXPECT_TRUE(registry->schedules().empty()); |
| EXPECT_TRUE(registry->machines().machines().empty()); |
| } |
| } |
| |
| |
| // Tests that adding and updating quotas in the registry works properly. |
| TEST_F(RegistrarTest, UpdateQuota) |
| { |
| const string ROLE1 = "role1"; |
| const string ROLE2 = "role2"; |
| |
| // NOTE: `quotaResources1` yields a collection with two `Resource` |
| // objects once converted to `RepeatedPtrField`. |
| Resources quotaResources1 = Resources::parse("cpus:1;mem:1024").get(); |
| Resources quotaResources2 = Resources::parse("cpus:2").get(); |
| |
| // Prepare `QuotaInfo` protobufs used in the test. |
| QuotaInfo quota1; |
| quota1.set_role(ROLE1); |
| quota1.mutable_guarantee()->CopyFrom(quotaResources1); |
| |
| Option<Error> validateError1 = quota::validation::quotaInfo(quota1); |
| EXPECT_NONE(validateError1); |
| |
| QuotaInfo quota2; |
| quota2.set_role(ROLE2); |
| quota2.mutable_guarantee()->CopyFrom(quotaResources1); |
| |
| Option<Error> validateError2 = quota::validation::quotaInfo(quota2); |
| EXPECT_NONE(validateError2); |
| |
| { |
| // Prepare the registrar; see the comment above why we need to do this in |
| // every scope. |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| // Store quota for a role without quota. |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new UpdateQuota(quota1)))); |
| } |
| |
| { |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| // Check that the recovered quota matches the one we stored. |
| ASSERT_EQ(1, registry->quotas().size()); |
| EXPECT_EQ(ROLE1, registry->quotas(0).info().role()); |
| ASSERT_EQ(2, registry->quotas(0).info().guarantee().size()); |
| |
| Resources storedResources(registry->quotas(0).info().guarantee()); |
| EXPECT_EQ(quotaResources1, storedResources); |
| |
| // Change quota for `ROLE1`. |
| quota1.mutable_guarantee()->CopyFrom(quotaResources2); |
| |
| // Update the only stored quota. |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new UpdateQuota(quota1)))); |
| } |
| |
| { |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| // Check that the recovered quota matches the one we updated. |
| ASSERT_EQ(1, registry->quotas().size()); |
| EXPECT_EQ(ROLE1, registry->quotas(0).info().role()); |
| ASSERT_EQ(1, registry->quotas(0).info().guarantee().size()); |
| |
| Resources storedResources(registry->quotas(0).info().guarantee()); |
| EXPECT_EQ(quotaResources2, storedResources); |
| |
| // Store one more quota for a role without quota. |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new UpdateQuota(quota2)))); |
| } |
| |
| { |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| // Check that the recovered quotas match those we stored previously. |
| // NOTE: We assume quota messages are stored in order they have |
| // been added. |
| // TODO(alexr): Consider removing dependency on the order. |
| ASSERT_EQ(2, registry->quotas().size()); |
| EXPECT_EQ(ROLE1, registry->quotas(0).info().role()); |
| ASSERT_EQ(1, registry->quotas(0).info().guarantee().size()); |
| |
| EXPECT_EQ(ROLE2, registry->quotas(1).info().role()); |
| ASSERT_EQ(2, registry->quotas(1).info().guarantee().size()); |
| |
| Resources storedResources(registry->quotas(1).info().guarantee()); |
| EXPECT_EQ(quotaResources1, storedResources); |
| |
| // Change quota for `role2`. |
| quota2.mutable_guarantee()->CopyFrom(quotaResources2); |
| |
| // Update quota for `role2` in presence of multiple quotas. |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new UpdateQuota(quota2)))); |
| } |
| |
| { |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| // Check that the recovered quotas match those we stored and updated |
| // previously. |
| // NOTE: We assume quota messages are stored in order they have been |
| // added and update does not change the order. |
| // TODO(alexr): Consider removing dependency on the order. |
| ASSERT_EQ(2, registry->quotas().size()); |
| |
| EXPECT_EQ(ROLE1, registry->quotas(0).info().role()); |
| ASSERT_EQ(1, registry->quotas(0).info().guarantee().size()); |
| |
| Resources storedResources1(registry->quotas(0).info().guarantee()); |
| EXPECT_EQ(quotaResources2, storedResources1); |
| |
| EXPECT_EQ(ROLE2, registry->quotas(1).info().role()); |
| ASSERT_EQ(1, registry->quotas(1).info().guarantee().size()); |
| |
| Resources storedResources2(registry->quotas(1).info().guarantee()); |
| EXPECT_EQ(quotaResources2, storedResources2); |
| } |
| } |
| |
| |
| // Tests removing quotas from the registry. |
| TEST_F(RegistrarTest, RemoveQuota) |
| { |
| const string ROLE1 = "role1"; |
| const string ROLE2 = "role2"; |
| |
| { |
| // Prepare the registrar; see the comment above why we need to do this in |
| // every scope. |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| // NOTE: `quotaResources` yields a collection with two `Resource` |
| // objects once converted to `RepeatedPtrField`. |
| Resources quotaResources1 = Resources::parse("cpus:1;mem:1024").get(); |
| Resources quotaResources2 = Resources::parse("cpus:2").get(); |
| |
| // Prepare `QuotaInfo` protobufs. |
| QuotaInfo quota1; |
| quota1.set_role(ROLE1); |
| quota1.mutable_guarantee()->CopyFrom(quotaResources1); |
| |
| Option<Error> validateError1 = quota::validation::quotaInfo(quota1); |
| EXPECT_NONE(validateError1); |
| |
| QuotaInfo quota2; |
| quota2.set_role(ROLE2); |
| quota2.mutable_guarantee()->CopyFrom(quotaResources2); |
| |
| Option<Error> validateError2 = quota::validation::quotaInfo(quota2); |
| EXPECT_NONE(validateError2); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new UpdateQuota(quota1)))); |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new UpdateQuota(quota2)))); |
| } |
| |
| { |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| // Check that the recovered quotas match those we stored previously. |
| // NOTE: We assume quota messages are stored in order they have been |
| // added. |
| // TODO(alexr): Consider removing dependency on the order. |
| ASSERT_EQ(2, registry->quotas().size()); |
| EXPECT_EQ(ROLE1, registry->quotas(0).info().role()); |
| EXPECT_EQ(ROLE2, registry->quotas(1).info().role()); |
| |
| // Remove quota for `role2`. |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new RemoveQuota(ROLE2)))); |
| } |
| |
| { |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| // Check that there is only one quota left in the registry. |
| ASSERT_EQ(1, registry->quotas().size()); |
| EXPECT_EQ(ROLE1, registry->quotas(0).info().role()); |
| |
| // Remove quota for `ROLE1`. |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new RemoveQuota(ROLE1)))); |
| } |
| |
| { |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| // Check that there are no more quotas at this point. |
| ASSERT_TRUE(registry->quotas().empty()); |
| } |
| } |
| |
| |
| // Tests that updating weights in the registry works properly. |
| TEST_F(RegistrarTest, UpdateWeights) |
| { |
| const string ROLE1 = "role1"; |
| double WEIGHT1 = 2.0; |
| double UPDATED_WEIGHT1 = 1.0; |
| |
| const string ROLE2 = "role2"; |
| double WEIGHT2 = 3.5; |
| |
| { |
| // Prepare the registrar. |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| ASSERT_TRUE(registry->weights().empty()); |
| |
| // Store the weight for 'ROLE1' previously without weight. |
| hashmap<string, double> weights; |
| weights[ROLE1] = WEIGHT1; |
| vector<WeightInfo> weightInfos = getWeightInfos(weights); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new UpdateWeights(weightInfos)))); |
| } |
| |
| { |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| // Check that the recovered weights matches the weights we stored |
| // previously. |
| ASSERT_EQ(1, registry->weights_size()); |
| EXPECT_EQ(ROLE1, registry->weights(0).info().role()); |
| ASSERT_EQ(WEIGHT1, registry->weights(0).info().weight()); |
| |
| // Change weight for 'ROLE1', and store the weight for 'ROLE2'. |
| hashmap<string, double> weights; |
| weights[ROLE1] = UPDATED_WEIGHT1; |
| weights[ROLE2] = WEIGHT2; |
| vector<WeightInfo> weightInfos = getWeightInfos(weights); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new UpdateWeights(weightInfos)))); |
| } |
| |
| { |
| Registrar registrar(flags, state); |
| Future<Registry> registry = registrar.recover(master); |
| AWAIT_READY(registry); |
| |
| // Check that the recovered weights matches the weights we updated |
| // previously. |
| ASSERT_EQ(2, registry->weights_size()); |
| EXPECT_EQ(ROLE1, registry->weights(0).info().role()); |
| ASSERT_EQ(UPDATED_WEIGHT1, registry->weights(0).info().weight()); |
| EXPECT_EQ(ROLE2, registry->weights(1).info().role()); |
| ASSERT_EQ(WEIGHT2, registry->weights(1).info().weight()); |
| } |
| } |
| |
| |
| TEST_F(RegistrarTest, Bootstrap) |
| { |
| // Run 1 simulates the reregistration of a slave that is not present |
| // in the registry. |
| { |
| Registrar registrar(flags, state); |
| AWAIT_READY(registrar.recover(master)); |
| |
| AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveReachable(slave)))); |
| } |
| |
| // Run 2 should see the slave. |
| { |
| Registrar registrar(flags, state); |
| |
| Future<Registry> registry = registrar.recover(master); |
| |
| AWAIT_READY(registry); |
| |
| ASSERT_EQ(1, registry->slaves().slaves().size()); |
| EXPECT_EQ(slave, registry->slaves().slaves(0).info()); |
| } |
| } |
| |
| |
| class MockStorage : public Storage |
| { |
| public: |
| MOCK_METHOD1(get, Future<Option<Entry>>(const string&)); |
| MOCK_METHOD2(set, Future<bool>(const Entry&, const id::UUID&)); |
| MOCK_METHOD1(expunge, Future<bool>(const Entry&)); |
| MOCK_METHOD0(names, Future<std::set<string>>()); |
| }; |
| |
| |
| TEST_F(RegistrarTest, FetchTimeout) |
| { |
| Clock::pause(); |
| |
| MockStorage storage; |
| State state(&storage); |
| |
| Future<Nothing> get; |
| EXPECT_CALL(storage, get(_)) |
| .WillOnce(DoAll(FutureSatisfy(&get), |
| Return(Future<Option<Entry>>()))); |
| |
| Registrar registrar(flags, &state); |
| |
| Future<Registry> recover = registrar.recover(master); |
| |
| AWAIT_READY(get); |
| |
| Clock::advance(flags.registry_fetch_timeout); |
| |
| AWAIT_FAILED(recover); |
| |
| Clock::resume(); |
| |
| // Ensure the registrar fails subsequent operations. |
| AWAIT_FAILED(registrar.apply(Owned<RegistryOperation>( |
| new AdmitSlave(slave)))); |
| } |
| |
| |
| TEST_F(RegistrarTest, StoreTimeout) |
| { |
| Clock::pause(); |
| |
| MockStorage storage; |
| State state(&storage); |
| |
| Registrar registrar(flags, &state); |
| |
| EXPECT_CALL(storage, get(_)) |
| .WillOnce(Return(None())); |
| |
| Future<Nothing> set; |
| EXPECT_CALL(storage, set(_, _)) |
| .WillOnce(DoAll(FutureSatisfy(&set), |
| Return(Future<bool>()))); |
| |
| Future<Registry> recover = registrar.recover(master); |
| |
| AWAIT_READY(set); |
| |
| Clock::advance(flags.registry_store_timeout); |
| |
| AWAIT_FAILED(recover); |
| |
| Clock::resume(); |
| |
| // Ensure the registrar fails subsequent operations. |
| AWAIT_FAILED(registrar.apply(Owned<RegistryOperation>( |
| new AdmitSlave(slave)))); |
| } |
| |
| |
| TEST_F(RegistrarTest, Abort) |
| { |
| MockStorage storage; |
| State state(&storage); |
| |
| Registrar registrar(flags, &state); |
| |
| EXPECT_CALL(storage, get(_)) |
| .WillOnce(Return(None())); |
| |
| EXPECT_CALL(storage, set(_, _)) |
| .WillOnce(Return(Future<bool>(true))) // Recovery. |
| .WillOnce(Return(Future<bool>::failed("failure"))) // Failure. |
| .WillRepeatedly(Return(Future<bool>(true))); // Success. |
| |
| AWAIT_READY(registrar.recover(master)); |
| |
| // Storage failure. |
| AWAIT_FAILED(registrar.apply(Owned<RegistryOperation>( |
| new AdmitSlave(slave)))); |
| |
| // The registrar should now be aborted! |
| AWAIT_FAILED(registrar.apply(Owned<RegistryOperation>( |
| new AdmitSlave(slave)))); |
| } |
| |
| |
| // Tests that requests to the '/registry' endpoint are authenticated when HTTP |
| // authentication is enabled. |
| TEST_F(RegistrarTest, Authentication) |
| { |
| const string AUTHENTICATION_REALM = "realm"; |
| |
| Credentials credentials; |
| credentials.add_credentials()->CopyFrom(DEFAULT_CREDENTIAL); |
| |
| Try<authentication::Authenticator*> authenticator = |
| BasicAuthenticatorFactory::create(AUTHENTICATION_REALM, credentials); |
| |
| ASSERT_SOME(authenticator); |
| |
| AWAIT_READY(authentication::setAuthenticator( |
| AUTHENTICATION_REALM, |
| Owned<authentication::Authenticator>(authenticator.get()))); |
| |
| Registrar registrar(flags, state, AUTHENTICATION_REALM); |
| |
| // Requests without credentials should be rejected. |
| Future<Response> response = process::http::get(registrar.pid(), "registry"); |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(Unauthorized({}).status, response); |
| |
| Credential badCredential; |
| badCredential.set_principal("bad-principal"); |
| badCredential.set_secret("bad-secret"); |
| |
| // Requests with bad credentials should be rejected. |
| response = process::http::get( |
| registrar.pid(), |
| "registry", |
| None(), |
| createBasicAuthHeaders(badCredential)); |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(Unauthorized({}).status, response); |
| |
| // Requests with good credentials should be permitted. |
| response = process::http::get( |
| registrar.pid(), |
| "registry", |
| None(), |
| createBasicAuthHeaders(DEFAULT_CREDENTIAL)); |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); |
| |
| AWAIT_READY(authentication::unsetAuthenticator(AUTHENTICATION_REALM)); |
| } |
| |
| |
| class Registrar_BENCHMARK_Test |
| : public RegistrarTestBase, |
| public WithParamInterface<size_t> {}; |
| |
| |
| // The Registrar benchmark tests are parameterized by the number of slaves. |
| INSTANTIATE_TEST_CASE_P( |
| SlaveCount, |
| Registrar_BENCHMARK_Test, |
| ::testing::Values(10000U, 20000U, 30000U, 50000U)); |
| |
| |
| TEST_P(Registrar_BENCHMARK_Test, Performance) |
| { |
| Registrar registrar(flags, state); |
| AWAIT_READY(registrar.recover(master)); |
| |
| Attributes attributes = Attributes::parse("foo:bar;baz:quux"); |
| Resources resources = |
| Resources::parse("cpus(*):1.0;mem(*):512;disk(*):2048").get(); |
| |
| size_t slaveCount = GetParam(); |
| |
| // Create slaves. |
| vector<SlaveInfo> infos; |
| for (size_t i = 0; i < slaveCount; ++i) { |
| // Simulate real slave information. |
| SlaveInfo info; |
| info.set_hostname("localhost"); |
| info.mutable_id()->set_value( |
| string("201310101658-2280333834-5050-48574-") + stringify(i)); |
| info.mutable_resources()->MergeFrom(resources); |
| info.mutable_attributes()->MergeFrom(attributes); |
| infos.push_back(info); |
| } |
| |
| // Admit slaves. |
| Stopwatch watch; |
| watch.start(); |
| Future<bool> result; |
| foreach (const SlaveInfo& info, infos) { |
| result = registrar.apply(Owned<RegistryOperation>(new AdmitSlave(info))); |
| } |
| AWAIT_READY_FOR(result, Minutes(5)); |
| LOG(INFO) << "Admitted " << slaveCount << " agents in " << watch.elapsed(); |
| |
| // Shuffle the slaves so we are readmitting them in random order ( |
| // same as in production). |
| std::random_shuffle(infos.begin(), infos.end()); |
| |
| // Mark slaves reachable again. This simulates the master failing |
| // over, and then the previously registered agents reregistering |
| // with the new master. |
| watch.start(); |
| foreach (const SlaveInfo& info, infos) { |
| result = registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveReachable(info))); |
| } |
| AWAIT_READY_FOR(result, Minutes(5)); |
| LOG(INFO) << "Marked " << slaveCount |
| << " agents reachable in " << watch.elapsed(); |
| |
| // Recover slaves. |
| Registrar registrar2(flags, state); |
| watch.start(); |
| MasterInfo info; |
| info.set_id("master"); |
| info.set_ip(10000000); |
| info.set_port(5050); |
| Future<Registry> registry = registrar2.recover(info); |
| AWAIT_READY(registry); |
| LOG(INFO) << "Recovered " << slaveCount << " agents (" |
| << Bytes(registry->ByteSize()) << ") in " << watch.elapsed(); |
| |
| // Shuffle the slaves so we are removing them in random order (same |
| // as in production). |
| std::random_shuffle(infos.begin(), infos.end()); |
| |
| // Remove slaves. |
| watch.start(); |
| foreach (const SlaveInfo& info, infos) { |
| result = registrar2.apply(Owned<RegistryOperation>(new RemoveSlave(info))); |
| } |
| AWAIT_READY_FOR(result, Minutes(5)); |
| cout << "Removed " << slaveCount << " agents in " << watch.elapsed() << endl; |
| } |
| |
| |
| // Test the performance of marking all registered slaves unreachable, |
| // then marking them reachable again. This might occur if there is a |
| // network partition and then the partition heals. |
| TEST_P(Registrar_BENCHMARK_Test, MarkUnreachableThenReachable) |
| { |
| Registrar registrar(flags, state); |
| AWAIT_READY(registrar.recover(master)); |
| |
| Attributes attributes = Attributes::parse("foo:bar;baz:quux"); |
| Resources resources = |
| Resources::parse("cpus(*):1.0;mem(*):512;disk(*):2048").get(); |
| |
| size_t slaveCount = GetParam(); |
| |
| // Create slaves. |
| vector<SlaveInfo> infos; |
| for (size_t i = 0; i < slaveCount; ++i) { |
| // Simulate real slave information. |
| SlaveInfo info; |
| info.set_hostname("localhost"); |
| info.mutable_id()->set_value( |
| string("201310101658-2280333834-5050-48574-") + stringify(i)); |
| info.mutable_resources()->MergeFrom(resources); |
| info.mutable_attributes()->MergeFrom(attributes); |
| infos.push_back(info); |
| } |
| |
| // Admit slaves. |
| Stopwatch watch; |
| watch.start(); |
| Future<bool> result; |
| foreach (const SlaveInfo& info, infos) { |
| result = registrar.apply(Owned<RegistryOperation>(new AdmitSlave(info))); |
| } |
| AWAIT_READY_FOR(result, Minutes(5)); |
| LOG(INFO) << "Admitted " << slaveCount << " agents in " << watch.elapsed(); |
| |
| // Shuffle the slaves so that we mark them unreachable in random |
| // order (same as in production). |
| std::random_shuffle(infos.begin(), infos.end()); |
| |
| // Mark all slaves unreachable. |
| TimeInfo unreachableTime = protobuf::getCurrentTime(); |
| |
| watch.start(); |
| foreach (const SlaveInfo& info, infos) { |
| result = registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveUnreachable(info, unreachableTime))); |
| } |
| AWAIT_READY_FOR(result, Minutes(5)); |
| cout << "Marked " << slaveCount << " agents unreachable in " |
| << watch.elapsed() << endl; |
| |
| // Shuffles the slaves again so that we mark them reachable in |
| // random order (same as in production). |
| std::random_shuffle(infos.begin(), infos.end()); |
| |
| // Mark all slaves reachable. |
| watch.start(); |
| foreach (const SlaveInfo& info, infos) { |
| result = registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveReachable(info))); |
| } |
| AWAIT_READY_FOR(result, Minutes(5)); |
| cout << "Marked " << slaveCount << " agents reachable in " |
| << watch.elapsed() << endl; |
| } |
| |
| |
| // Test the performance of garbage collecting a large portion of the |
| // unreachable list in a single operation. We use a fixed percentage |
| // at the moment (50%). |
| TEST_P(Registrar_BENCHMARK_Test, GcManyAgents) |
| { |
| Registrar registrar(flags, state); |
| AWAIT_READY(registrar.recover(master)); |
| |
| Attributes attributes = Attributes::parse("foo:bar;baz:quux"); |
| Resources resources = |
| Resources::parse("cpus(*):1.0;mem(*):512;disk(*):2048").get(); |
| |
| size_t slaveCount = GetParam(); |
| |
| // Create slaves. |
| vector<SlaveInfo> infos; |
| for (size_t i = 0; i < slaveCount; ++i) { |
| // Simulate real slave information. |
| SlaveInfo info; |
| info.set_hostname("localhost"); |
| info.mutable_id()->set_value( |
| string("201310101658-2280333834-5050-48574-") + stringify(i)); |
| info.mutable_resources()->MergeFrom(resources); |
| info.mutable_attributes()->MergeFrom(attributes); |
| infos.push_back(info); |
| } |
| |
| // Admit slaves. |
| Stopwatch watch; |
| watch.start(); |
| Future<bool> result; |
| foreach (const SlaveInfo& info, infos) { |
| result = registrar.apply(Owned<RegistryOperation>(new AdmitSlave(info))); |
| } |
| AWAIT_READY_FOR(result, Minutes(5)); |
| LOG(INFO) << "Admitted " << slaveCount << " agents in " << watch.elapsed(); |
| |
| // Shuffle the slaves so that we mark them unreachable in random |
| // order (same as in production). |
| std::random_shuffle(infos.begin(), infos.end()); |
| |
| // Mark all slaves unreachable. |
| TimeInfo unreachableTime = protobuf::getCurrentTime(); |
| |
| watch.start(); |
| foreach (const SlaveInfo& info, infos) { |
| result = registrar.apply(Owned<RegistryOperation>( |
| new MarkSlaveUnreachable(info, unreachableTime))); |
| } |
| AWAIT_READY_FOR(result, Minutes(5)); |
| LOG(INFO) << "Marked " << slaveCount << " agents unreachable in " |
| << watch.elapsed() << endl; |
| |
| // Prepare to GC the first half of the unreachable list. |
| hashset<SlaveID> toRemove; |
| for (size_t i = 0; (i * 2) < slaveCount; i++) { |
| const SlaveInfo& info = infos[i]; |
| toRemove.insert(info.id()); |
| } |
| |
| // Do GC. |
| watch.start(); |
| result = registrar.apply(Owned<RegistryOperation>( |
| new Prune(toRemove, hashset<SlaveID>()))); |
| |
| AWAIT_READY_FOR(result, Minutes(5)); |
| cout << "Garbage collected " << toRemove.size() << " agents in " |
| << watch.elapsed() << endl; |
| } |
| |
| } // namespace tests { |
| } // namespace internal { |
| } // namespace mesos { |