blob: 7e022cfebdbf988354a20645b5bff30e83ad076e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <list>
#include <set>
#include <string>
#include <vector>
#include <gmock/gmock.h>
#include <mesos/mesos.hpp>
#include <mesos/type_utils.hpp>
#include <mesos/log/log.hpp>
#include <mesos/state/in_memory.hpp>
#include <mesos/state/leveldb.hpp>
#include <mesos/state/log.hpp>
#include <mesos/state/protobuf.hpp>
#include <mesos/state/storage.hpp>
#include <mesos/state/zookeeper.hpp>
#include <process/future.hpp>
#include <process/gtest.hpp>
#include <process/protobuf.hpp>
#include <process/pid.hpp>
#include <stout/gtest.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/try.hpp>
#include <stout/tests/utils.hpp>
#include "log/replica.hpp"
#include "log/tool/initialize.hpp"
#include "master/registry.hpp"
#include "messages/state.hpp"
#ifdef MESOS_HAS_JAVA
#include "tests/zookeeper.hpp"
#endif
using namespace mesos::internal::log;
using namespace process;
using std::list;
using std::set;
using std::string;
using std::vector;
using mesos::log::Log;
using mesos::state::Storage;
using mesos::state::LevelDBStorage;
#ifdef MESOS_HAS_JAVA
using mesos::state::ZooKeeperStorage;
#endif
using mesos::state::protobuf::State;
using mesos::state::protobuf::Variable;
namespace mesos {
namespace internal {
namespace tests {
typedef mesos::internal::Registry::Slaves Slaves;
typedef mesos::internal::Registry::Slave Slave;
// We declare this here to avoid collision with the top-level
// `mesos::Operation` protobuf message
using mesos::internal::state::Operation;
void FetchAndStoreAndFetch(State* state)
{
Future<Variable<Slaves>> future1 = state->fetch<Slaves>("slaves");
AWAIT_READY(future1);
Variable<Slaves> variable = future1.get();
Slaves slaves1 = variable.get();
ASSERT_TRUE(slaves1.slaves().empty());
Slave* slave = slaves1.add_slaves();
slave->mutable_info()->set_hostname("localhost");
variable = variable.mutate(slaves1);
Future<Option<Variable<Slaves>>> future2 = state->store(variable);
AWAIT_READY(future2);
ASSERT_SOME(future2.get());
future1 = state->fetch<Slaves>("slaves");
AWAIT_READY(future1);
variable = future1.get();
Slaves slaves2 = variable.get();
ASSERT_EQ(1, slaves2.slaves().size());
EXPECT_EQ("localhost", slaves2.slaves(0).info().hostname());
}
void FetchAndStoreAndStoreAndFetch(State* state)
{
Future<Variable<Slaves>> future1 = state->fetch<Slaves>("slaves");
AWAIT_READY(future1);
Variable<Slaves> variable = future1.get();
Slaves slaves1 = variable.get();
ASSERT_TRUE(slaves1.slaves().empty());
Slave* slave = slaves1.add_slaves();
slave->mutable_info()->set_hostname("localhost");
variable = variable.mutate(slaves1);
Future<Option<Variable<Slaves>>> future2 = state->store(variable);
AWAIT_READY(future2);
ASSERT_SOME(future2.get());
variable = future2->get();
future2 = state->store(variable);
AWAIT_READY(future2);
ASSERT_SOME(future2.get());
future1 = state->fetch<Slaves>("slaves");
AWAIT_READY(future1);
variable = future1.get();
Slaves slaves2 = variable.get();
ASSERT_EQ(1, slaves2.slaves().size());
EXPECT_EQ("localhost", slaves2.slaves(0).info().hostname());
}
void FetchAndStoreAndStoreFailAndFetch(State* state)
{
Future<Variable<Slaves>> future1 = state->fetch<Slaves>("slaves");
AWAIT_READY(future1);
Variable<Slaves> variable1 = future1.get();
Slaves slaves1 = variable1.get();
ASSERT_TRUE(slaves1.slaves().empty());
Slave* slave1 = slaves1.add_slaves();
slave1->mutable_info()->set_hostname("localhost1");
Variable<Slaves> variable2 = variable1.mutate(slaves1);
Future<Option<Variable<Slaves>>> future2 = state->store(variable2);
AWAIT_READY(future2);
ASSERT_SOME(future2.get());
Slaves slaves2 = variable1.get();
ASSERT_TRUE(slaves2.slaves().empty());
Slave* slave2 = slaves2.add_slaves();
slave2->mutable_info()->set_hostname("localhost2");
variable2 = variable1.mutate(slaves2);
future2 = state->store(variable2);
AWAIT_READY(future2);
EXPECT_NONE(future2.get());
future1 = state->fetch<Slaves>("slaves");
AWAIT_READY(future1);
variable1 = future1.get();
slaves1 = variable1.get();
ASSERT_EQ(1, slaves1.slaves().size());
EXPECT_EQ("localhost1", slaves1.slaves(0).info().hostname());
}
void FetchAndStoreAndExpungeAndFetch(State* state)
{
Future<Variable<Slaves>> future1 = state->fetch<Slaves>("slaves");
AWAIT_READY(future1);
Variable<Slaves> variable = future1.get();
Slaves slaves1 = variable.get();
ASSERT_TRUE(slaves1.slaves().empty());
Slave* slave = slaves1.add_slaves();
slave->mutable_info()->set_hostname("localhost");
variable = variable.mutate(slaves1);
Future<Option<Variable<Slaves>>> future2 = state->store(variable);
AWAIT_READY(future2);
ASSERT_SOME(future2.get());
variable = future2->get();
Future<bool> future3 = state->expunge(variable);
AWAIT_READY(future3);
ASSERT_TRUE(future3.get());
future1 = state->fetch<Slaves>("slaves");
AWAIT_READY(future1);
variable = future1.get();
Slaves slaves2 = variable.get();
ASSERT_TRUE(slaves2.slaves().empty());
}
void FetchAndStoreAndExpungeAndExpunge(State* state)
{
Future<Variable<Slaves>> future1 = state->fetch<Slaves>("slaves");
AWAIT_READY(future1);
Variable<Slaves> variable = future1.get();
Slaves slaves1 = variable.get();
ASSERT_TRUE(slaves1.slaves().empty());
Slave* slave = slaves1.add_slaves();
slave->mutable_info()->set_hostname("localhost");
variable = variable.mutate(slaves1);
Future<Option<Variable<Slaves>>> future2 = state->store(variable);
AWAIT_READY(future2);
ASSERT_SOME(future2.get());
variable = future2->get();
Future<bool> future3 = state->expunge(variable);
AWAIT_READY(future3);
ASSERT_TRUE(future3.get());
future3 = state->expunge(variable);
AWAIT_READY(future3);
ASSERT_FALSE(future3.get());
}
void FetchAndStoreAndExpungeAndStoreAndFetch(State* state)
{
Future<Variable<Slaves>> future1 = state->fetch<Slaves>("slaves");
AWAIT_READY(future1);
Variable<Slaves> variable = future1.get();
Slaves slaves1 = variable.get();
ASSERT_TRUE(slaves1.slaves().empty());
Slave* slave = slaves1.add_slaves();
slave->mutable_info()->set_hostname("localhost");
variable = variable.mutate(slaves1);
Future<Option<Variable<Slaves>>> future2 = state->store(variable);
AWAIT_READY(future2);
ASSERT_SOME(future2.get());
variable = future2->get();
Future<bool> future3 = state->expunge(variable);
AWAIT_READY(future3);
ASSERT_TRUE(future3.get());
future2 = state->store(variable);
AWAIT_READY(future2);
ASSERT_SOME(future2.get());
future1 = state->fetch<Slaves>("slaves");
AWAIT_READY(future1);
variable = future1.get();
Slaves slaves2 = variable.get();
ASSERT_EQ(1, slaves2.slaves().size());
EXPECT_EQ("localhost", slaves2.slaves(0).info().hostname());
}
void Names(State* state)
{
Future<Variable<Slaves>> future1 = state->fetch<Slaves>("slaves");
AWAIT_READY(future1);
Variable<Slaves> variable = future1.get();
Slaves slaves1 = variable.get();
ASSERT_TRUE(slaves1.slaves().empty());
Slave* slave = slaves1.add_slaves();
slave->mutable_info()->set_hostname("localhost");
variable = variable.mutate(slaves1);
Future<Option<Variable<Slaves>>> future2 = state->store(variable);
AWAIT_READY(future2);
ASSERT_SOME(future2.get());
Future<set<string>> names = state->names();
AWAIT_READY(names);
ASSERT_EQ(1u, names->size());
EXPECT_NE(names->find("slaves"), names->end());
}
class InMemoryStateTest : public ::testing::Test
{
public:
InMemoryStateTest()
: storage(nullptr),
state(nullptr) {}
protected:
void SetUp() override
{
storage = new mesos::state::InMemoryStorage();
state = new State(storage);
}
void TearDown() override
{
delete state;
delete storage;
}
mesos::state::Storage* storage;
State* state;
};
TEST_F(InMemoryStateTest, FetchAndStoreAndFetch)
{
FetchAndStoreAndFetch(state);
}
TEST_F(InMemoryStateTest, FetchAndStoreAndStoreAndFetch)
{
FetchAndStoreAndStoreAndFetch(state);
}
TEST_F(InMemoryStateTest, FetchAndStoreAndStoreFailAndFetch)
{
FetchAndStoreAndStoreFailAndFetch(state);
}
TEST_F(InMemoryStateTest, FetchAndStoreAndExpungeAndFetch)
{
FetchAndStoreAndExpungeAndFetch(state);
}
TEST_F(InMemoryStateTest, FetchAndStoreAndExpungeAndExpunge)
{
FetchAndStoreAndExpungeAndExpunge(state);
}
TEST_F(InMemoryStateTest, FetchAndStoreAndExpungeAndStoreAndFetch)
{
FetchAndStoreAndExpungeAndStoreAndFetch(state);
}
TEST_F(InMemoryStateTest, Names)
{
Names(state);
}
class LevelDBStateTest : public TemporaryDirectoryTest
{
public:
LevelDBStateTest()
: storage(nullptr),
state(nullptr) {}
protected:
void SetUp() override
{
TemporaryDirectoryTest::SetUp();
ASSERT_SOME(sandbox);
path = sandbox.get() + "/.state";
storage = new mesos::state::LevelDBStorage(path);
state = new State(storage);
}
void TearDown() override
{
delete state;
delete storage;
TemporaryDirectoryTest::TearDown();
}
mesos::state::Storage* storage;
State* state;
private:
string path;
};
TEST_F(LevelDBStateTest, FetchAndStoreAndFetch)
{
FetchAndStoreAndFetch(state);
}
TEST_F(LevelDBStateTest, FetchAndStoreAndStoreAndFetch)
{
FetchAndStoreAndStoreAndFetch(state);
}
TEST_F(LevelDBStateTest, FetchAndStoreAndStoreFailAndFetch)
{
FetchAndStoreAndStoreFailAndFetch(state);
}
TEST_F(LevelDBStateTest, FetchAndStoreAndExpungeAndFetch)
{
FetchAndStoreAndExpungeAndFetch(state);
}
TEST_F(LevelDBStateTest, FetchAndStoreAndExpungeAndExpunge)
{
FetchAndStoreAndExpungeAndExpunge(state);
}
TEST_F(LevelDBStateTest, FetchAndStoreAndExpungeAndStoreAndFetch)
{
FetchAndStoreAndExpungeAndStoreAndFetch(state);
}
TEST_F(LevelDBStateTest, Names)
{
Names(state);
}
class LogStateTest : public TemporaryDirectoryTest
{
public:
LogStateTest()
: storage(nullptr),
state(nullptr),
replica2(nullptr),
log(nullptr) {}
protected:
void SetUp() override
{
TemporaryDirectoryTest::SetUp();
// For initializing the replicas.
tool::Initialize initializer;
string path1 = os::getcwd() + "/.log1";
string path2 = os::getcwd() + "/.log2";
initializer.flags.path = path1;
initializer.execute();
initializer.flags.path = path2;
initializer.execute();
// Only create the replica for 'path2' (i.e., the second replica)
// as the first replica will be created when we create a Log.
replica2 = new Replica(path2);
set<UPID> pids;
pids.insert(replica2->pid());
log = new Log(2, path1, pids);
storage = new mesos::state::LogStorage(log, 1024);
state = new State(storage);
}
void TearDown() override
{
delete state;
delete storage;
delete log;
delete replica2;
TemporaryDirectoryTest::TearDown();
}
mesos::state::Storage* storage;
State* state;
Replica* replica2;
Log* log;
};
TEST_F(LogStateTest, FetchAndStoreAndFetch)
{
FetchAndStoreAndFetch(state);
}
TEST_F(LogStateTest, FetchAndStoreAndStoreAndFetch)
{
FetchAndStoreAndStoreAndFetch(state);
}
TEST_F(LogStateTest, FetchAndStoreAndStoreFailAndFetch)
{
FetchAndStoreAndStoreFailAndFetch(state);
}
TEST_F(LogStateTest, FetchAndStoreAndExpungeAndFetch)
{
FetchAndStoreAndExpungeAndFetch(state);
}
TEST_F(LogStateTest, FetchAndStoreAndExpungeAndExpunge)
{
FetchAndStoreAndExpungeAndExpunge(state);
}
TEST_F(LogStateTest, FetchAndStoreAndExpungeAndStoreAndFetch)
{
FetchAndStoreAndExpungeAndStoreAndFetch(state);
}
TEST_F(LogStateTest, Names)
{
Names(state);
}
Future<Option<Variable<Slaves>>> timeout(
Future<Option<Variable<Slaves>>> future)
{
future.discard();
return Failure("Timeout");
}
TEST_F(LogStateTest, Timeout)
{
Clock::pause();
Future<Variable<Slaves>> future1 = state->fetch<Slaves>("slaves");
AWAIT_READY(future1);
Variable<Slaves> variable = future1.get();
Slaves slaves1 = variable.get();
ASSERT_TRUE(slaves1.slaves().empty());
Slave* slave = slaves1.add_slaves();
slave->mutable_info()->set_hostname("localhost");
variable = variable.mutate(slaves1);
// Now terminate the replica so the store will timeout.
terminate(replica2->pid());
wait(replica2->pid());
Future<Option<Variable<Slaves>>> future2 = state->store(variable);
Future<Option<Variable<Slaves>>> future3 =
future2.after(Seconds(5), lambda::bind(&timeout, lambda::_1));
ASSERT_TRUE(future2.isPending());
ASSERT_TRUE(future3.isPending());
Clock::advance(Seconds(5));
AWAIT_DISCARDED(future2);
AWAIT_FAILED(future3);
Clock::resume();
}
TEST_F(LogStateTest, Diff)
{
Future<Variable<Slaves>> future1 = state->fetch<Slaves>("slaves");
AWAIT_READY(future1);
Variable<Slaves> variable = future1.get();
Slaves slaves = variable.get();
ASSERT_TRUE(slaves.slaves().empty());
for (size_t i = 0; i < 1024; i++) {
Slave* slave = slaves.add_slaves();
slave->mutable_info()->set_hostname("localhost" + stringify(i));
}
variable = variable.mutate(slaves);
Future<Option<Variable<Slaves>>> future2 = state->store(variable);
AWAIT_READY(future2);
ASSERT_SOME(future2.get());
variable = future2->get();
Slave* slave = slaves.add_slaves();
slave->mutable_info()->set_hostname("localhost1024");
variable = variable.mutate(slaves);
future2 = state->store(variable);
AWAIT_READY(future2);
ASSERT_SOME(future2.get());
// It's possible that we're doing truncation asynchronously which
// will cause the test to fail because we'll end up getting a
// pending position from Log::Reader::ending which will cause
// Log::Reader::read to fail. To remedy this, we pause the clock and
// wait for all executing processe to settle.
Clock::pause();
Clock::settle();
Clock::resume();
Log::Reader reader(log);
Future<Log::Position> beginning = reader.beginning();
Future<Log::Position> ending = reader.ending();
AWAIT_READY(beginning);
AWAIT_READY(ending);
Future<list<Log::Entry>> entries = reader.read(beginning.get(), ending.get());
AWAIT_READY(entries);
// Convert each Log::Entry to an Operation.
vector<Operation> operations;
foreach (const Log::Entry& entry, entries.get()) {
// Parse the Operation from the Log::Entry.
Operation operation;
google::protobuf::io::ArrayInputStream stream(
entry.data.data(),
entry.data.size());
ASSERT_TRUE(operation.ParseFromZeroCopyStream(&stream));
operations.push_back(operation);
}
ASSERT_EQ(2u, operations.size());
EXPECT_EQ(Operation::SNAPSHOT, operations[0].type());
EXPECT_EQ(Operation::DIFF, operations[1].type());
}
#ifdef MESOS_HAS_JAVA
class ZooKeeperStateTest : public tests::ZooKeeperTest
{
public:
ZooKeeperStateTest()
: storage(nullptr),
state(nullptr) {}
protected:
void SetUp() override
{
ZooKeeperTest::SetUp();
storage = new mesos::state::ZooKeeperStorage(
server->connectString(),
NO_TIMEOUT,
"/state/");
state = new State(storage);
}
void TearDown() override
{
delete state;
delete storage;
ZooKeeperTest::TearDown();
}
mesos::state::Storage* storage;
State* state;
};
TEST_F(ZooKeeperStateTest, FetchAndStoreAndFetch)
{
FetchAndStoreAndFetch(state);
}
TEST_F(ZooKeeperStateTest, FetchAndStoreAndStoreAndFetch)
{
FetchAndStoreAndStoreAndFetch(state);
}
TEST_F(ZooKeeperStateTest, FetchAndStoreAndStoreFailAndFetch)
{
FetchAndStoreAndStoreFailAndFetch(state);
}
TEST_F(ZooKeeperStateTest, FetchAndStoreAndExpungeAndFetch)
{
FetchAndStoreAndExpungeAndFetch(state);
}
TEST_F(ZooKeeperStateTest, FetchAndStoreAndExpungeAndExpunge)
{
FetchAndStoreAndExpungeAndExpunge(state);
}
TEST_F(ZooKeeperStateTest, FetchAndStoreAndExpungeAndStoreAndFetch)
{
FetchAndStoreAndExpungeAndStoreAndFetch(state);
}
TEST_F(ZooKeeperStateTest, Names)
{
Names(state);
}
#endif // MESOS_HAS_JAVA
} // namespace tests {
} // namespace internal {
} // namespace mesos {