| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #ifndef __TESTS_CLUSTER_HPP__ |
| #define __TESTS_CLUSTER_HPP__ |
| |
| #include <map> |
| |
| #include <mesos/mesos.hpp> |
| |
| #include <process/clock.hpp> |
| #include <process/future.hpp> |
| #include <process/gmock.hpp> |
| #include <process/gtest.hpp> |
| #include <process/owned.hpp> |
| #include <process/pid.hpp> |
| #include <process/process.hpp> |
| |
| #include <stout/error.hpp> |
| #include <stout/foreach.hpp> |
| #include <stout/none.hpp> |
| #include <stout/nothing.hpp> |
| #include <stout/option.hpp> |
| #include <stout/path.hpp> |
| #include <stout/strings.hpp> |
| #include <stout/try.hpp> |
| |
| #include "files/files.hpp" |
| |
| #ifdef __linux__ |
| #include "linux/cgroups.hpp" |
| #endif // __linux__ |
| |
| #include "authorizer/authorizer.hpp" |
| |
| #include "log/log.hpp" |
| |
| #include "log/tool/initialize.hpp" |
| |
| #include "master/allocator.hpp" |
| #include "master/contender.hpp" |
| #include "master/detector.hpp" |
| #include "master/hierarchical_allocator_process.hpp" |
| #include "master/flags.hpp" |
| #include "master/master.hpp" |
| #include "master/registrar.hpp" |
| #include "master/repairer.hpp" |
| |
| #include "slave/flags.hpp" |
| #include "slave/gc.hpp" |
| #include "slave/containerizer/containerizer.hpp" |
| #include "slave/slave.hpp" |
| #include "slave/status_update_manager.hpp" |
| |
| #include "state/in_memory.hpp" |
| #include "state/log.hpp" |
| #include "state/protobuf.hpp" |
| #include "state/storage.hpp" |
| |
| #include "zookeeper/url.hpp" |
| |
| namespace mesos { |
| namespace internal { |
| namespace tests { |
| |
| class Cluster |
| { |
| public: |
| Cluster(const Option<zookeeper::URL>& url = None()) |
| : masters(this, url), |
| slaves(this, &masters) {} |
| |
| // Abstracts the masters of a cluster. |
| class Masters |
| { |
| public: |
| Masters(Cluster* _cluster, const Option<zookeeper::URL>& _url); |
| ~Masters(); |
| |
| void shutdown(); |
| |
| // Start a new master with the provided flags and injections. |
| Try<process::PID<master::Master> > start( |
| const master::Flags& flags = master::Flags(), |
| const Option<master::allocator::AllocatorProcess*>& allocator = None(), |
| const Option<Authorizer*>& authorizer = None()); |
| |
| // Stops and cleans up a master at the specified PID. |
| Try<Nothing> stop(const process::PID<master::Master>& pid); |
| |
| // Returns a new master detector for this instance of masters. |
| process::Owned<MasterDetector> detector(); |
| |
| private: |
| // Not copyable, not assignable. |
| Masters(const Masters&); |
| Masters& operator = (const Masters&); |
| |
| Cluster* cluster; // Enclosing class. |
| Option<zookeeper::URL> url; |
| |
| // Encapsulates a single master's dependencies. |
| struct Master |
| { |
| Master() : master(NULL) {} |
| |
| process::Owned<master::allocator::AllocatorProcess> allocatorProcess; |
| process::Owned<master::allocator::Allocator> allocator; |
| |
| process::Owned<log::Log> log; |
| process::Owned<state::Storage> storage; |
| process::Owned<state::protobuf::State> state; |
| process::Owned<master::Registrar> registrar; |
| |
| process::Owned<master::Repairer> repairer; |
| |
| process::Owned<MasterContender> contender; |
| process::Owned<MasterDetector> detector; |
| |
| process::Owned<Authorizer> authorizer; |
| |
| master::Master* master; |
| }; |
| |
| std::map<process::PID<master::Master>, Master> masters; |
| }; |
| |
| // Abstracts the slaves of a cluster. |
| class Slaves |
| { |
| public: |
| Slaves(Cluster* _cluster, Masters* _masters); |
| ~Slaves(); |
| |
| // Stop and clean up all slaves. |
| void shutdown(); |
| |
| // Start a new slave with the provided flags and injections. |
| Try<process::PID<slave::Slave> > start( |
| const slave::Flags& flags = slave::Flags(), |
| const Option<slave::Containerizer*>& containerizer = None(), |
| const Option<MasterDetector*>& detector = None(), |
| const Option<slave::GarbageCollector*>& gc = None(), |
| const Option<slave::StatusUpdateManager*>& statusUpdateManager = |
| None()); |
| |
| // Stops and cleans up a slave at the specified PID. If 'shutdown' |
| // is true than the slave is sent a shutdown message instead of |
| // being terminated. |
| Try<Nothing> stop( |
| const process::PID<slave::Slave>& pid, |
| bool shutdown = false); |
| |
| private: |
| // Not copyable, not assignable. |
| Slaves(const Slaves&); |
| Slaves& operator = (const Slaves&); |
| |
| Cluster* cluster; // Enclosing class. |
| Masters* masters; // Used to create MasterDetector instances. |
| |
| // Encapsulates a single slave's dependencies. |
| struct Slave |
| { |
| Slave() |
| : containerizer(NULL), |
| createdContainerizer(false), |
| slave(NULL) {} |
| |
| slave::Containerizer* containerizer; |
| bool createdContainerizer; // Whether we own the containerizer. |
| |
| process::Owned<slave::StatusUpdateManager> statusUpdateManager; |
| process::Owned<slave::GarbageCollector> gc; |
| process::Owned<MasterDetector> detector; |
| slave::Flags flags; |
| slave::Slave* slave; |
| }; |
| |
| std::map<process::PID<slave::Slave>, Slave> slaves; |
| }; |
| |
| // Shuts down all masters and slaves. |
| void shutdown() |
| { |
| masters.shutdown(); |
| slaves.shutdown(); |
| } |
| |
| // Cluster wide shared abstractions. |
| Files files; |
| |
| Masters masters; |
| Slaves slaves; |
| |
| private: |
| // Not copyable, not assignable. |
| Cluster(const Cluster&); |
| Cluster& operator = (const Cluster&); |
| }; |
| |
| |
| inline Cluster::Masters::Masters( |
| Cluster* _cluster, |
| const Option<zookeeper::URL>& _url) |
| : cluster(_cluster), |
| url(_url) {} |
| |
| |
| inline Cluster::Masters::~Masters() |
| { |
| shutdown(); |
| } |
| |
| |
| inline void Cluster::Masters::shutdown() |
| { |
| // TODO(benh): Use utils::copy from stout once namespaced. |
| std::map<process::PID<master::Master>, Master> copy(masters); |
| foreachkey (const process::PID<master::Master>& pid, copy) { |
| stop(pid); |
| } |
| masters.clear(); |
| } |
| |
| |
| inline Try<process::PID<master::Master> > Cluster::Masters::start( |
| const master::Flags& flags, |
| const Option<master::allocator::AllocatorProcess*>& allocatorProcess, |
| const Option<Authorizer*>& authorizer) |
| { |
| // Disallow multiple masters when not using ZooKeeper. |
| if (!masters.empty() && url.isNone()) { |
| return Error("Can not start multiple masters when not using ZooKeeper"); |
| } |
| |
| Master master; |
| |
| if (allocatorProcess.isNone()) { |
| master.allocatorProcess.reset( |
| new master::allocator::HierarchicalDRFAllocatorProcess()); |
| master.allocator.reset( |
| new master::allocator::Allocator(master.allocatorProcess.get())); |
| } else { |
| master.allocator.reset( |
| new master::allocator::Allocator(allocatorProcess.get())); |
| } |
| |
| if (flags.registry == "in_memory") { |
| if (flags.registry_strict) { |
| return Error( |
| "Cannot use '--registry_strict' when using in-memory storage based" |
| " registry"); |
| } |
| master.storage.reset(new state::InMemoryStorage()); |
| } else if (flags.registry == "replicated_log") { |
| if (flags.work_dir.isNone()) { |
| return Error( |
| "Need to specify --work_dir for replicated log based registry"); |
| } |
| |
| if (url.isSome()) { |
| // Use ZooKeeper based replicated log. |
| if (flags.quorum.isNone()) { |
| return Error( |
| "Need to specify --quorum for replicated log based registry" |
| " when using ZooKeeper"); |
| } |
| master.log.reset(new log::Log( |
| flags.quorum.get(), |
| path::join(flags.work_dir.get(), "replicated_log"), |
| url.get().servers, |
| flags.zk_session_timeout, |
| path::join(url.get().path, "log_replicas"), |
| url.get().authentication, |
| flags.log_auto_initialize)); |
| } else { |
| master.log.reset(new log::Log( |
| 1, |
| path::join(flags.work_dir.get(), "replicated_log"), |
| std::set<process::UPID>(), |
| flags.log_auto_initialize)); |
| } |
| |
| master.storage.reset(new state::LogStorage(master.log.get())); |
| } else { |
| return Error("'" + flags.registry + "' is not a supported option for" |
| " registry persistence"); |
| } |
| |
| CHECK_NOTNULL(master.storage.get()); |
| |
| master.state.reset(new state::protobuf::State(master.storage.get())); |
| master.registrar.reset(new master::Registrar(flags, master.state.get())); |
| master.repairer.reset(new master::Repairer()); |
| |
| if (url.isSome()) { |
| master.contender.reset(new ZooKeeperMasterContender(url.get())); |
| master.detector.reset(new ZooKeeperMasterDetector(url.get())); |
| } else { |
| master.contender.reset(new StandaloneMasterContender()); |
| master.detector.reset(new StandaloneMasterDetector()); |
| } |
| |
| if (authorizer.isSome()) { |
| CHECK_NOTNULL(authorizer.get()); |
| } else if (flags.acls.isSome()) { |
| Try<process::Owned<Authorizer> > authorizer_ = |
| Authorizer::create(flags.acls.get()); |
| if (authorizer_.isError()) { |
| return Error("Failed to initialize the authorizer: " + |
| authorizer_.error() + " (see --acls flag)"); |
| } |
| process::Owned<Authorizer> authorizer__ = authorizer_.get(); |
| master.authorizer = authorizer__; |
| } |
| |
| master.master = new master::Master( |
| master.allocator.get(), |
| master.registrar.get(), |
| master.repairer.get(), |
| &cluster->files, |
| master.contender.get(), |
| master.detector.get(), |
| authorizer.isSome() ? authorizer : master.authorizer.get(), |
| flags); |
| |
| if (url.isNone()) { |
| // This means we are using the StandaloneMasterDetector. |
| StandaloneMasterDetector* detector_ = CHECK_NOTNULL( |
| dynamic_cast<StandaloneMasterDetector*>(master.detector.get())); |
| |
| detector_->appoint(master.master->info()); |
| } |
| |
| process::Future<Nothing> _recover = |
| FUTURE_DISPATCH(master.master->self(), &master::Master::_recover); |
| |
| process::PID<master::Master> pid = process::spawn(master.master); |
| |
| masters[pid] = master; |
| |
| // Speed up the tests by ensuring that the Master is recovered |
| // before the test proceeds. Otherwise, authentication and |
| // registration messages may be dropped, causing delayed retries. |
| // NOTE: We use process::internal::await() to avoid awaiting a |
| // Future forever when the Clock is paused. |
| if (!process::internal::await( |
| _recover, |
| flags.registry_fetch_timeout + flags.registry_store_timeout)) { |
| LOG(FATAL) << "Failed to wait for _recover"; |
| } |
| |
| bool paused = process::Clock::paused(); |
| |
| // Need to settle the Clock to ensure that the Master finishes |
| // executing _recover() before we return. |
| process::Clock::pause(); |
| process::Clock::settle(); |
| |
| // Return the Clock to its original state. |
| if (!paused) { |
| process::Clock::resume(); |
| } |
| |
| return pid; |
| } |
| |
| |
| inline Try<Nothing> Cluster::Masters::stop( |
| const process::PID<master::Master>& pid) |
| { |
| if (masters.count(pid) == 0) { |
| return Error("No master found to stop"); |
| } |
| |
| Master master = masters[pid]; |
| |
| process::terminate(master.master); |
| process::wait(master.master); |
| delete master.master; |
| |
| masters.erase(pid); |
| |
| return Nothing(); |
| } |
| |
| |
| inline process::Owned<MasterDetector> Cluster::Masters::detector() |
| { |
| if (url.isSome()) { |
| return process::Owned<MasterDetector>( |
| new ZooKeeperMasterDetector(url.get())); |
| } |
| |
| CHECK(masters.size() == 1); |
| |
| return process::Owned<MasterDetector>( |
| new StandaloneMasterDetector(masters.begin()->first)); |
| } |
| |
| |
| inline Cluster::Slaves::Slaves(Cluster* _cluster, Masters* _masters) |
| : cluster(_cluster), masters(_masters) {} |
| |
| |
| inline Cluster::Slaves::~Slaves() |
| { |
| shutdown(); |
| } |
| |
| |
| inline void Cluster::Slaves::shutdown() |
| { |
| // TODO(benh): Use utils::copy from stout once namespaced. |
| std::map<process::PID<slave::Slave>, Slave> copy(slaves); |
| foreachpair (const process::PID<slave::Slave>& pid, |
| const Slave& slave, |
| copy) { |
| process::Future<hashset<ContainerID> > containers = |
| slave.containerizer->containers(); |
| AWAIT_READY(containers); |
| |
| foreach (const ContainerID& containerId, containers.get()) { |
| // We need to wait on the container before destroying it in case someone |
| // else has already waited on it (and therefore would be immediately |
| // 'reaped' before we could wait on it). |
| process::Future<containerizer::Termination> wait = |
| slave.containerizer->wait(containerId); |
| |
| slave.containerizer->destroy(containerId); |
| |
| AWAIT_READY(wait); |
| } |
| |
| stop(pid); |
| } |
| slaves.clear(); |
| } |
| |
| |
| inline Try<process::PID<slave::Slave> > Cluster::Slaves::start( |
| const slave::Flags& flags, |
| const Option<slave::Containerizer*>& containerizer, |
| const Option<MasterDetector*>& detector, |
| const Option<slave::GarbageCollector*>& gc, |
| const Option<slave::StatusUpdateManager*>& statusUpdateManager) |
| { |
| // TODO(benh): Create a work directory if using the default. |
| |
| Slave slave; |
| |
| if (containerizer.isSome()) { |
| slave.containerizer = containerizer.get(); |
| } else { |
| Try<slave::Containerizer*> containerizer = |
| slave::Containerizer::create(flags, true); |
| CHECK_SOME(containerizer); |
| |
| slave.containerizer = containerizer.get(); |
| slave.createdContainerizer = true; |
| } |
| |
| // Get a detector for the master(s) if one wasn't provided. |
| if (detector.isNone()) { |
| slave.detector = masters->detector(); |
| } |
| |
| // Create a garbage collector if one wasn't provided. |
| if (gc.isNone()) { |
| slave.gc.reset(new slave::GarbageCollector()); |
| } |
| |
| // Create a status update manager if one wasn't provided. |
| if (statusUpdateManager.isNone()) { |
| slave.statusUpdateManager.reset(new slave::StatusUpdateManager(flags)); |
| } |
| |
| slave.flags = flags; |
| |
| slave.slave = new slave::Slave( |
| flags, |
| detector.get(slave.detector.get()), |
| slave.containerizer, |
| &cluster->files, |
| gc.get(slave.gc.get()), |
| statusUpdateManager.get(slave.statusUpdateManager.get())); |
| |
| process::PID<slave::Slave> pid = process::spawn(slave.slave); |
| |
| slaves[pid] = slave; |
| |
| return pid; |
| } |
| |
| |
| inline Try<Nothing> Cluster::Slaves::stop( |
| const process::PID<slave::Slave>& pid, |
| bool shutdown) |
| { |
| if (slaves.count(pid) == 0) { |
| return Error("No slave found to stop"); |
| } |
| |
| Slave slave = slaves[pid]; |
| |
| if (shutdown) { |
| process::dispatch(slave.slave, |
| &slave::Slave::shutdown, |
| process::UPID(), |
| ""); |
| } else { |
| process::terminate(slave.slave); |
| } |
| process::wait(slave.slave); |
| delete slave.slave; |
| |
| if (slave.createdContainerizer) { |
| delete slave.containerizer; |
| } |
| |
| #ifdef __linux__ |
| // Remove all of this processes threads into the root cgroups - this |
| // simulates the slave process terminating and permits a new slave to start |
| // when the --slave_subsystems flag is used. |
| if (slave.flags.slave_subsystems.isSome()) { |
| foreach (const std::string& subsystem, |
| strings::tokenize(slave.flags.slave_subsystems.get(), ",")) { |
| std::string hierarchy = path::join( |
| slave.flags.cgroups_hierarchy, subsystem); |
| |
| std::string cgroup = path::join(slave.flags.cgroups_root, "slave"); |
| |
| Try<bool> exists = cgroups::exists(hierarchy, cgroup); |
| if (exists.isError() || !exists.get()) { |
| EXIT(1) << "Failed to find cgroup " << cgroup |
| << " for subsystem " << subsystem |
| << " under hierarchy " << hierarchy |
| << " for slave: " + exists.error(); |
| } |
| |
| // Move all of our threads into the root cgroup. |
| Try<Nothing> assign = cgroups::assign(hierarchy, "", getpid()); |
| if (assign.isError()) { |
| EXIT(1) << "Failed to move slave threads into cgroup " << cgroup |
| << " for subsystem " << subsystem |
| << " under hierarchy " << hierarchy |
| << " for slave: " + assign.error(); |
| } |
| } |
| } |
| #endif // __linux__ |
| |
| slaves.erase(pid); |
| |
| return Nothing(); |
| } |
| |
| } // namespace tests { |
| } // namespace internal { |
| } // namespace mesos { |
| |
| #endif // __TESTS_CLUSTER_HPP__ |