blob: 084fb1ce37a315c561c4587c4761c870f54c8625 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "tests/cluster.hpp"
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
#include <process/id.hpp>
#include <process/limiter.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
#include <stout/duration.hpp>
#include <stout/error.hpp>
#include <stout/foreach.hpp>
#include <stout/none.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/path.hpp>
#include <stout/strings.hpp>
#include <stout/try.hpp>
#include "master/allocator/mesos/hierarchical.hpp"
namespace mesos {
namespace internal {
namespace tests {
Cluster::Masters::Masters(
Cluster* _cluster,
const Option<zookeeper::URL>& _url)
: cluster(_cluster),
url(_url) {}
Cluster::Masters::~Masters()
{
shutdown();
}
void Cluster::Masters::shutdown()
{
// TODO(benh): Use utils::copy from stout once namespaced.
std::map<process::PID<master::Master>, Master> copy(masters);
foreachkey (const process::PID<master::Master>& pid, copy) {
stop(pid);
}
masters.clear();
}
Try<process::PID<master::Master>> Cluster::Masters::start(
const master::Flags& flags,
const Option<mesos::master::allocator::Allocator*>& allocator,
const Option<Authorizer*>& authorizer,
const Option<std::shared_ptr<process::RateLimiter>>& slaveRemovalLimiter)
{
// Disallow multiple masters when not using ZooKeeper.
if (!masters.empty() && url.isNone()) {
return Error("Can not start multiple masters when not using ZooKeeper");
}
Master master;
if (allocator.isSome()) {
master.allocator = allocator.get();
} else {
// If allocator is not provided, fall back to the default one,
// managed by Cluster::Masters.
Try<mesos::master::allocator::Allocator*> allocator_ =
master::allocator::HierarchicalDRFAllocator::create();
if (allocator_.isError()) {
return Error(
"Failed to create an instance of HierarchicalDRFAllocator: " +
allocator_.error());
}
master.allocator = allocator_.get();
master.createdAllocator = true;
}
if (flags.registry == "in_memory") {
if (flags.registry_strict) {
return Error(
"Cannot use '--registry_strict' when using in-memory storage based"
" registry");
}
master.storage.reset(new state::InMemoryStorage());
} else if (flags.registry == "replicated_log") {
if (flags.work_dir.isNone()) {
return Error(
"Need to specify --work_dir for replicated log based registry");
}
if (url.isSome()) {
// Use ZooKeeper based replicated log.
if (flags.quorum.isNone()) {
return Error(
"Need to specify --quorum for replicated log based registry"
" when using ZooKeeper");
}
master.log.reset(new log::Log(
flags.quorum.get(),
path::join(flags.work_dir.get(), "replicated_log"),
url.get().servers,
flags.zk_session_timeout,
path::join(url.get().path, "log_replicas"),
url.get().authentication,
flags.log_auto_initialize));
} else {
master.log.reset(new log::Log(
1,
path::join(flags.work_dir.get(), "replicated_log"),
std::set<process::UPID>(),
flags.log_auto_initialize));
}
master.storage.reset(new state::LogStorage(master.log.get()));
} else {
return Error("'" + flags.registry + "' is not a supported option for"
" registry persistence");
}
CHECK_NOTNULL(master.storage.get());
master.state.reset(new state::protobuf::State(master.storage.get()));
master.registrar.reset(new master::Registrar(flags, master.state.get()));
master.repairer.reset(new master::Repairer());
if (url.isSome()) {
master.contender.reset(new ZooKeeperMasterContender(url.get()));
master.detector.reset(new ZooKeeperMasterDetector(url.get()));
} else {
master.contender.reset(new StandaloneMasterContender());
master.detector.reset(new StandaloneMasterDetector());
}
if (authorizer.isSome()) {
CHECK_NOTNULL(authorizer.get());
} else if (flags.acls.isSome()) {
Try<Authorizer*> local = Authorizer::create(master::DEFAULT_AUTHORIZER);
if (local.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to instantiate the local authorizer: "
<< local.error();
}
Try<Nothing> initialized = local.get()->initialize(flags.acls.get());
if (initialized.isError()) {
return Error("Failed to initialize the authorizer: " +
initialized.error() + " (see --acls flag)");
}
master.authorizer.reset(local.get());
}
if (slaveRemovalLimiter.isNone() &&
flags.slave_removal_rate_limit.isSome()) {
// Parse the flag value.
// TODO(vinod): Move this parsing logic to flags once we have a
// 'Rate' abstraction in stout.
std::vector<std::string> tokens =
strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
if (tokens.size() != 2) {
EXIT(1) << "Invalid slave_removal_rate_limit: "
<< flags.slave_removal_rate_limit.get()
<< ". Format is <Number of slaves>/<Duration>";
}
Try<int> permits = numify<int>(tokens[0]);
if (permits.isError()) {
EXIT(1) << "Invalid slave_removal_rate_limit: "
<< flags.slave_removal_rate_limit.get()
<< ". Format is <Number of slaves>/<Duration>"
<< ": " << permits.error();
}
Try<Duration> duration = Duration::parse(tokens[1]);
if (duration.isError()) {
EXIT(1) << "Invalid slave_removal_rate_limit: "
<< flags.slave_removal_rate_limit.get()
<< ". Format is <Number of slaves>/<Duration>"
<< ": " << duration.error();
}
master.slaveRemovalLimiter = std::shared_ptr<process::RateLimiter>(
new process::RateLimiter(permits.get(), duration.get()));
}
master.master = new master::Master(
master.allocator,
master.registrar.get(),
master.repairer.get(),
&cluster->files,
master.contender.get(),
master.detector.get(),
authorizer.isSome()
? authorizer : master.authorizer.get(),
slaveRemovalLimiter.isSome()
? slaveRemovalLimiter : master.slaveRemovalLimiter,
flags);
if (url.isNone()) {
// This means we are using the StandaloneMasterDetector.
StandaloneMasterDetector* detector_ = CHECK_NOTNULL(
dynamic_cast<StandaloneMasterDetector*>(master.detector.get()));
detector_->appoint(master.master->info());
}
process::Future<Nothing> _recover =
FUTURE_DISPATCH(master.master->self(), &master::Master::_recover);
process::PID<master::Master> pid = process::spawn(master.master);
masters[pid] = master;
// Speed up the tests by ensuring that the Master is recovered
// before the test proceeds. Otherwise, authentication and
// registration messages may be dropped, causing delayed retries.
// NOTE: We use process::internal::await() to avoid awaiting a
// Future forever when the Clock is paused.
if (!process::internal::await(
_recover,
flags.registry_fetch_timeout + flags.registry_store_timeout)) {
LOG(FATAL) << "Failed to wait for _recover";
}
bool paused = process::Clock::paused();
// Need to settle the Clock to ensure that the Master finishes
// executing _recover() before we return.
process::Clock::pause();
process::Clock::settle();
// Return the Clock to its original state.
if (!paused) {
process::Clock::resume();
}
return pid;
}
Try<Nothing> Cluster::Masters::stop(
const process::PID<master::Master>& pid)
{
if (masters.count(pid) == 0) {
return Error("No master found to stop");
}
Master master = masters[pid];
process::terminate(master.master);
process::wait(master.master);
delete master.master;
if (master.createdAllocator) {
delete master.allocator;
}
masters.erase(pid);
return Nothing();
}
process::Owned<MasterDetector> Cluster::Masters::detector()
{
if (url.isSome()) {
return process::Owned<MasterDetector>(
new ZooKeeperMasterDetector(url.get()));
}
CHECK(masters.size() == 1);
return process::Owned<MasterDetector>(
new StandaloneMasterDetector(masters.begin()->first));
}
Cluster::Slaves::Slaves(Cluster* _cluster, Masters* _masters)
: cluster(_cluster), masters(_masters) {}
Cluster::Slaves::~Slaves()
{
shutdown();
}
void Cluster::Slaves::shutdown()
{
// TODO(benh): Use utils::copy from stout once namespaced.
std::map<process::PID<slave::Slave>, Slave> copy(slaves);
foreachpair (const process::PID<slave::Slave>& pid,
const Slave& slave,
copy) {
// Destroy the existing containers on the slave. Note that some
// containers may terminate while we are doing this, so we ignore
// any 'wait' failures and ensure that there are no containers
// when we're done destroying.
process::Future<hashset<ContainerID>> containers =
slave.containerizer->containers();
AWAIT_READY(containers);
foreach (const ContainerID& containerId, containers.get()) {
process::Future<containerizer::Termination> wait =
slave.containerizer->wait(containerId);
slave.containerizer->destroy(containerId);
AWAIT(wait);
}
containers = slave.containerizer->containers();
AWAIT_READY(containers);
ASSERT_TRUE(containers.get().empty())
<< "Failed to destroy containers: " << stringify(containers.get());
stop(pid);
}
slaves.clear();
}
Try<process::PID<slave::Slave>> Cluster::Slaves::start(
const slave::Flags& flags,
const Option<std::string>& id,
const Option<slave::Containerizer*>& containerizer,
const Option<MasterDetector*>& detector,
const Option<slave::GarbageCollector*>& gc,
const Option<slave::StatusUpdateManager*>& statusUpdateManager,
const Option<mesos::slave::ResourceEstimator*>& resourceEstimator,
const Option<mesos::slave::QoSController*>& qosController)
{
// TODO(benh): Create a work directory if using the default.
Slave slave;
if (containerizer.isSome()) {
slave.containerizer = containerizer.get();
} else {
// Create a new fetcher.
slave.fetcher.reset(new slave::Fetcher());
Try<slave::Containerizer*> containerizer =
slave::Containerizer::create(flags, true, slave.fetcher.get());
CHECK_SOME(containerizer);
slave.containerizer = containerizer.get();
slave.createdContainerizer = true;
}
if (resourceEstimator.isNone()) {
Try<mesos::slave::ResourceEstimator*> _resourceEstimator =
mesos::slave::ResourceEstimator::create(flags.resource_estimator);
CHECK_SOME(_resourceEstimator);
slave.resourceEstimator.reset(_resourceEstimator.get());
}
if (qosController.isNone()) {
Try<mesos::slave::QoSController*> _qosController =
mesos::slave::QoSController::create(flags.qos_controller);
CHECK_SOME(_qosController);
slave.qosController.reset(_qosController.get());
}
// Get a detector for the master(s) if one wasn't provided.
if (detector.isNone()) {
slave.detector = masters->detector();
}
// Create a garbage collector if one wasn't provided.
if (gc.isNone()) {
slave.gc.reset(new slave::GarbageCollector());
}
// Create a status update manager if one wasn't provided.
if (statusUpdateManager.isNone()) {
slave.statusUpdateManager.reset(new slave::StatusUpdateManager(flags));
}
slave.flags = flags;
slave.slave = new slave::Slave(
id.isSome() ? id.get() : process::ID::generate("slave"),
flags,
detector.getOrElse(slave.detector.get()),
slave.containerizer,
&cluster->files,
gc.getOrElse(slave.gc.get()),
statusUpdateManager.getOrElse(slave.statusUpdateManager.get()),
resourceEstimator.getOrElse(slave.resourceEstimator.get()),
qosController.getOrElse(slave.qosController.get()));
process::PID<slave::Slave> pid = process::spawn(slave.slave);
slaves[pid] = slave;
return pid;
}
Try<Nothing> Cluster::Slaves::stop(
const process::PID<slave::Slave>& pid,
bool shutdown)
{
if (slaves.count(pid) == 0) {
return Error("No slave found to stop");
}
Slave slave = slaves[pid];
if (shutdown) {
process::dispatch(slave.slave,
&slave::Slave::shutdown,
process::UPID(),
"");
} else {
process::terminate(slave.slave);
}
process::wait(slave.slave);
delete slave.slave;
if (slave.createdContainerizer) {
delete slave.containerizer;
}
#ifdef __linux__
// Remove all of this processes threads into the root cgroups - this
// simulates the slave process terminating and permits a new slave to start
// when the --slave_subsystems flag is used.
if (slave.flags.slave_subsystems.isSome()) {
foreach (const std::string& subsystem,
strings::tokenize(slave.flags.slave_subsystems.get(), ",")) {
std::string hierarchy = path::join(
slave.flags.cgroups_hierarchy, subsystem);
std::string cgroup = path::join(slave.flags.cgroups_root, "slave");
Try<bool> exists = cgroups::exists(hierarchy, cgroup);
if (exists.isError() || !exists.get()) {
EXIT(1) << "Failed to find cgroup " << cgroup
<< " for subsystem " << subsystem
<< " under hierarchy " << hierarchy
<< " for slave: " + exists.error();
}
// Move all of our threads into the root cgroup.
Try<Nothing> assign = cgroups::assign(hierarchy, "", getpid());
if (assign.isError()) {
EXIT(1) << "Failed to move slave threads into cgroup " << cgroup
<< " for subsystem " << subsystem
<< " under hierarchy " << hierarchy
<< " for slave: " + assign.error();
}
}
}
#endif // __linux__
slaves.erase(pid);
return Nothing();
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {