blob: cd83bf09313db1ee1825c30bb5e07e386622e668 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include <map>
#include <set>
#include <string>
#include <vector>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
#include <stout/bytes.hpp>
#include <stout/stopwatch.hpp>
#include <stout/uuid.hpp>
#include "common/protobuf_utils.hpp"
#include "common/type_utils.hpp"
#include "log/log.hpp"
#include "log/replica.hpp"
#include "log/tool/initialize.hpp"
#include "messages/state.hpp"
#include "master/flags.hpp"
#include "master/master.hpp"
#include "master/registrar.hpp"
#include "state/log.hpp"
#include "state/protobuf.hpp"
#include "state/storage.hpp"
#include "tests/utils.hpp"
using namespace mesos;
using namespace mesos::internal;
using namespace process;
using log::Log;
using log::Replica;
using state::Entry;
using state::LogStorage;
using state::Storage;
using state::protobuf::State;
using std::map;
using std::set;
using std::string;
using std::vector;
using testing::_;
using testing::DoAll;
using testing::Eq;
using testing::Return;
using mesos::internal::tests::TemporaryDirectoryTest;
using ::testing::WithParamInterface;
namespace mesos {
namespace internal {
namespace master {
// TODO(xujyan): This class copies code from LogStateTest. It would
// be nice to find a common location for log related base tests when
// there are more uses of it.
class RegistrarTestBase : public TemporaryDirectoryTest
{
public:
RegistrarTestBase()
: log(NULL),
storage(NULL),
state(NULL),
replica2(NULL) {}
protected:
virtual void SetUp()
{
TemporaryDirectoryTest::SetUp();
// For initializing the replicas.
log::tool::Initialize initializer;
string path1 = os::getcwd() + "/.log1";
string path2 = os::getcwd() + "/.log2";
initializer.flags.path = path1;
initializer.execute();
initializer.flags.path = path2;
initializer.execute();
// Only create the replica for 'path2' (i.e., the second replica)
// as the first replica will be created when we create a Log.
replica2 = new Replica(path2);
set<UPID> pids;
pids.insert(replica2->pid());
log = new Log(2, path1, pids);
storage = new LogStorage(log);
state = new State(storage);
master.CopyFrom(protobuf::createMasterInfo(UPID("master@127.0.0.1:5050")));
SlaveID id;
id.set_value("1");
SlaveInfo info;
info.set_hostname("localhost");
info.mutable_id()->CopyFrom(id);
slave.CopyFrom(info);
}
virtual void TearDown()
{
delete state;
delete storage;
delete log;
delete replica2;
TemporaryDirectoryTest::TearDown();
}
Log* log;
Storage* storage;
State* state;
Replica* replica2;
MasterInfo master;
SlaveInfo slave;
Flags flags;
};
class RegistrarTest : public RegistrarTestBase,
public WithParamInterface<bool>
{
protected:
virtual void SetUp()
{
RegistrarTestBase::SetUp();
flags.registry_strict = GetParam();
}
};
// The Registrar tests are parameterized by "strictness".
INSTANTIATE_TEST_CASE_P(Strict, RegistrarTest, ::testing::Bool());
TEST_P(RegistrarTest, recover)
{
Registrar registrar(flags, state);
// Operations preceding recovery will fail.
AWAIT_EXPECT_FAILED(
registrar.apply(Owned<Operation>(new AdmitSlave(slave))));
AWAIT_EXPECT_FAILED(
registrar.apply(Owned<Operation>(new ReadmitSlave(slave))));
AWAIT_EXPECT_FAILED(
registrar.apply(Owned<Operation>(new RemoveSlave(slave))));
Future<Registry> registry = registrar.recover(master);
// Before waiting for the recovery to complete, invoke some
// operations to ensure they do not fail.
Future<bool> admit = registrar.apply(
Owned<Operation>(new AdmitSlave(slave)));
Future<bool> readmit = registrar.apply(
Owned<Operation>(new ReadmitSlave(slave)));
Future<bool> remove = registrar.apply(
Owned<Operation>(new RemoveSlave(slave)));
AWAIT_READY(registry);
EXPECT_EQ(master, registry.get().master().info());
AWAIT_EQ(true, admit);
AWAIT_EQ(true, readmit);
AWAIT_EQ(true, remove);
}
TEST_P(RegistrarTest, admit)
{
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
AWAIT_EQ(true, registrar.apply(Owned<Operation>(new AdmitSlave(slave))));
if (flags.registry_strict) {
AWAIT_EQ(false, registrar.apply(Owned<Operation>(new AdmitSlave(slave))));
} else {
AWAIT_EQ(true, registrar.apply(Owned<Operation>(new AdmitSlave(slave))));
}
}
TEST_P(RegistrarTest, readmit)
{
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
SlaveInfo info1;
info1.set_hostname("localhost");
SlaveID id1;
id1.set_value("1");
info1.mutable_id()->CopyFrom(id1);
SlaveID id2;
id2.set_value("2");
SlaveInfo info2;
info2.set_hostname("localhost");
info2.mutable_id()->CopyFrom(id2);
AWAIT_EQ(true, registrar.apply(Owned<Operation>(new AdmitSlave(info1))));
AWAIT_EQ(true, registrar.apply(Owned<Operation>(new ReadmitSlave(info1))));
if (flags.registry_strict) {
AWAIT_EQ(false, registrar.apply(Owned<Operation>(new ReadmitSlave(info2))));
} else {
AWAIT_EQ(true, registrar.apply(Owned<Operation>(new ReadmitSlave(info2))));
}
}
TEST_P(RegistrarTest, remove)
{
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
SlaveInfo info1;
info1.set_hostname("localhost");
SlaveID id1;
id1.set_value("1");
info1.mutable_id()->CopyFrom(id1);
SlaveID id2;
id2.set_value("2");
SlaveInfo info2;
info2.set_hostname("localhost");
info2.mutable_id()->CopyFrom(id2);
SlaveID id3;
id3.set_value("3");
SlaveInfo info3;
info3.set_hostname("localhost");
info3.mutable_id()->CopyFrom(id3);
AWAIT_EQ(true, registrar.apply(Owned<Operation>(new AdmitSlave(info1))));
AWAIT_EQ(true, registrar.apply(Owned<Operation>(new AdmitSlave(info2))));
AWAIT_EQ(true, registrar.apply(Owned<Operation>(new AdmitSlave(info3))));
AWAIT_EQ(true, registrar.apply(Owned<Operation>(new RemoveSlave(info1))));
if (flags.registry_strict) {
AWAIT_EQ(false, registrar.apply(Owned<Operation>(new RemoveSlave(info1))));
} else {
AWAIT_EQ(true, registrar.apply(Owned<Operation>(new RemoveSlave(info1))));
}
AWAIT_EQ(true, registrar.apply(Owned<Operation>(new AdmitSlave(info1))));
AWAIT_EQ(true, registrar.apply(Owned<Operation>(new RemoveSlave(info2))));
if (flags.registry_strict) {
AWAIT_EQ(false, registrar.apply(Owned<Operation>(new RemoveSlave(info2))));
} else {
AWAIT_EQ(true, registrar.apply(Owned<Operation>(new RemoveSlave(info2))));
}
AWAIT_EQ(true, registrar.apply(Owned<Operation>(new RemoveSlave(info3))));
if (flags.registry_strict) {
AWAIT_EQ(false, registrar.apply(Owned<Operation>(new RemoveSlave(info3))));
} else {
AWAIT_EQ(true, registrar.apply(Owned<Operation>(new RemoveSlave(info3))));
}
}
TEST_P(RegistrarTest, bootstrap)
{
// Run 1 readmits a slave that is not present.
{
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
// If not strict, we should be allowed to readmit the slave.
if (flags.registry_strict) {
AWAIT_EQ(false,
registrar.apply(Owned<Operation>(new ReadmitSlave(slave))));
} else {
AWAIT_EQ(true,
registrar.apply(Owned<Operation>(new ReadmitSlave(slave))));
}
}
// Run 2 should see the slave if not strict.
{
Registrar registrar(flags, state);
Future<Registry> registry = registrar.recover(master);
AWAIT_READY(registry);
if (flags.registry_strict) {
EXPECT_EQ(0, registry.get().slaves().slaves().size());
} else {
ASSERT_EQ(1, registry.get().slaves().slaves().size());
EXPECT_EQ(slave, registry.get().slaves().slaves(0).info());
}
}
}
class MockStorage : public Storage
{
public:
MOCK_METHOD1(get, Future<Option<Entry> >(const string&));
MOCK_METHOD2(set, Future<bool>(const Entry&, const UUID&));
MOCK_METHOD1(expunge, Future<bool>(const Entry&));
MOCK_METHOD0(names, Future<std::set<string> >(void));
};
TEST_P(RegistrarTest, fetchTimeout)
{
Clock::pause();
MockStorage storage;
State state(&storage);
Future<Nothing> get;
EXPECT_CALL(storage, get(_))
.WillOnce(DoAll(FutureSatisfy(&get),
Return(Future<Option<Entry> >())));
Registrar registrar(flags, &state);
Future<Registry> recover = registrar.recover(master);
AWAIT_READY(get);
Clock::advance(flags.registry_fetch_timeout);
AWAIT_FAILED(recover);
Clock::resume();
// Ensure the registrar fails subsequent operations.
AWAIT_FAILED(registrar.apply(Owned<Operation>(new AdmitSlave(slave))));
}
TEST_P(RegistrarTest, storeTimeout)
{
Clock::pause();
MockStorage storage;
State state(&storage);
Registrar registrar(flags, &state);
EXPECT_CALL(storage, get(_))
.WillOnce(Return(None()));
Future<Nothing> set;
EXPECT_CALL(storage, set(_, _))
.WillOnce(DoAll(FutureSatisfy(&set),
Return(Future<bool>())));
Future<Registry> recover = registrar.recover(master);
AWAIT_READY(set);
Clock::advance(flags.registry_store_timeout);
AWAIT_FAILED(recover);
Clock::resume();
// Ensure the registrar fails subsequent operations.
AWAIT_FAILED(registrar.apply(Owned<Operation>(new AdmitSlave(slave))));
}
TEST_P(RegistrarTest, abort)
{
MockStorage storage;
State state(&storage);
Registrar registrar(flags, &state);
EXPECT_CALL(storage, get(_))
.WillOnce(Return(None()));
EXPECT_CALL(storage, set(_, _))
.WillOnce(Return(Future<bool>(true))) // Recovery.
.WillOnce(Return(Future<bool>::failed("failure"))) // Failure.
.WillRepeatedly(Return(Future<bool>(true))); // Success.
AWAIT_READY(registrar.recover(master));
// Storage failure.
AWAIT_FAILED(registrar.apply(Owned<Operation>(new AdmitSlave(slave))));
// The registrar should now be aborted!
AWAIT_FAILED(registrar.apply(Owned<Operation>(new AdmitSlave(slave))));
}
class Registrar_BENCHMARK_Test : public RegistrarTestBase,
public WithParamInterface<size_t>
{};
// The Registrar benchmark tests are parameterized by the number of slaves.
INSTANTIATE_TEST_CASE_P(
SlaveCount,
Registrar_BENCHMARK_Test,
::testing::Values(10000U, 20000U, 30000U, 50000U));
TEST_P(Registrar_BENCHMARK_Test, performance)
{
Registrar registrar(flags, state);
AWAIT_READY(registrar.recover(master));
vector<SlaveInfo> infos;
Attributes attributes = Attributes::parse("foo:bar;baz:quux");
Resources resources =
Resources::parse("cpus(*):1.0;mem(*):512;disk(*):2048").get();
size_t slaveCount = GetParam();
// Create slaves.
for (size_t i = 0; i < slaveCount; ++i) {
// Simulate real slave information.
SlaveInfo info;
info.set_hostname("localhost");
info.mutable_id()->set_value(
std::string("201310101658-2280333834-5050-48574-") + stringify(i));
info.mutable_resources()->MergeFrom(resources);
info.mutable_attributes()->MergeFrom(attributes);
infos.push_back(info);
}
// Admit slaves.
Stopwatch watch;
watch.start();
Future<bool> result;
foreach (const SlaveInfo& info, infos) {
result = registrar.apply(Owned<Operation>(new AdmitSlave(info)));
}
AWAIT_READY_FOR(result, Minutes(5));
LOG(INFO) << "Admitted " << slaveCount << " slaves in " << watch.elapsed();
// Shuffle the slaves so we are readmitting them in random order (
// same as in production).
std::random_shuffle(infos.begin(), infos.end());
// Readmit slaves.
watch.start();
foreach (const SlaveInfo& info, infos) {
result = registrar.apply(Owned<Operation>(new ReadmitSlave(info)));
}
AWAIT_READY_FOR(result, Minutes(5));
LOG(INFO) << "Readmitted " << slaveCount << " slaves in " << watch.elapsed();
// Recover slaves.
Registrar registrar2(flags, state);
watch.start();
MasterInfo info;
info.set_id("master");
info.set_ip(10000000);
info.set_port(5050);
Future<Registry> registry = registrar2.recover(info);
AWAIT_READY(registry);
LOG(INFO) << "Recovered " << slaveCount << " slaves ("
<< Bytes(registry.get().ByteSize()) << ") in " << watch.elapsed();
// Shuffle the slaves so we are removing them in random order (same
// as in production).
std::random_shuffle(infos.begin(), infos.end());
// Remove slaves.
watch.start();
foreach (const SlaveInfo& info, infos) {
result = registrar2.apply(Owned<Operation>(new RemoveSlave(info)));
}
AWAIT_READY_FOR(result, Minutes(5));
LOG(INFO) << "Removed " << slaveCount << " slaves in " << watch.elapsed();
}
} // namespace master {
} // namespace internal {
} // namespace mesos {