| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include <map> |
| #include <memory> |
| #include <set> |
| #include <sstream> |
| #include <string> |
| #include <vector> |
| |
| #include <mesos/authorizer/authorizer.hpp> |
| |
| #include <mesos/master/allocator.hpp> |
| |
| #include <mesos/module/anonymous.hpp> |
| #include <mesos/module/authorizer.hpp> |
| |
| #include <mesos/slave/resource_estimator.hpp> |
| |
| #include <process/limiter.hpp> |
| #include <process/owned.hpp> |
| #include <process/pid.hpp> |
| |
| #include <stout/duration.hpp> |
| #include <stout/exit.hpp> |
| #include <stout/foreach.hpp> |
| #include <stout/os.hpp> |
| #include <stout/path.hpp> |
| #include <stout/try.hpp> |
| #include <stout/strings.hpp> |
| |
| #include "common/protobuf_utils.hpp" |
| |
| #include "local.hpp" |
| |
| #include "logging/flags.hpp" |
| #include "logging/logging.hpp" |
| |
| #include "master/contender.hpp" |
| #include "master/detector.hpp" |
| #include "master/master.hpp" |
| #include "master/registrar.hpp" |
| #include "master/repairer.hpp" |
| |
| #include "master/allocator/mesos/hierarchical.hpp" |
| #include "master/allocator/sorter/drf/sorter.hpp" |
| |
| #include "module/manager.hpp" |
| |
| #include "slave/gc.hpp" |
| #include "slave/slave.hpp" |
| #include "slave/status_update_manager.hpp" |
| |
| #include "slave/containerizer/containerizer.hpp" |
| #include "slave/containerizer/fetcher.hpp" |
| |
| #include "state/in_memory.hpp" |
| #include "state/log.hpp" |
| #include "state/protobuf.hpp" |
| #include "state/storage.hpp" |
| |
| using namespace mesos::internal; |
| using namespace mesos::internal::log; |
| |
| using mesos::master::allocator::Allocator; |
| |
| using mesos::internal::master::allocator::HierarchicalDRFAllocator; |
| |
| using mesos::internal::master::Master; |
| using mesos::internal::master::Registrar; |
| using mesos::internal::master::Repairer; |
| |
| using mesos::internal::slave::Containerizer; |
| using mesos::internal::slave::Fetcher; |
| using mesos::internal::slave::GarbageCollector; |
| using mesos::internal::slave::Slave; |
| using mesos::internal::slave::StatusUpdateManager; |
| |
| using mesos::modules::Anonymous; |
| using mesos::modules::ModuleManager; |
| |
| using mesos::slave::QoSController; |
| using mesos::slave::ResourceEstimator; |
| |
| using process::Owned; |
| using process::PID; |
| using process::RateLimiter; |
| using process::UPID; |
| |
| using std::map; |
| using std::set; |
| using std::shared_ptr; |
| using std::string; |
| using std::stringstream; |
| using std::vector; |
| |
| |
| namespace mesos { |
| namespace internal { |
| namespace local { |
| |
| static Allocator* allocator = NULL; |
| static Log* log = NULL; |
| static state::Storage* storage = NULL; |
| static state::protobuf::State* state = NULL; |
| static Registrar* registrar = NULL; |
| static Repairer* repairer = NULL; |
| static Master* master = NULL; |
| static map<Containerizer*, Slave*> slaves; |
| static StandaloneMasterDetector* detector = NULL; |
| static MasterContender* contender = NULL; |
| static Option<Authorizer*> authorizer = None(); |
| static Files* files = NULL; |
| static vector<GarbageCollector*>* garbageCollectors = NULL; |
| static vector<StatusUpdateManager*>* statusUpdateManagers = NULL; |
| static vector<Fetcher*>* fetchers = NULL; |
| static vector<ResourceEstimator*>* resourceEstimators = NULL; |
| static vector<QoSController*>* qosControllers = NULL; |
| |
| |
| PID<Master> launch(const Flags& flags, Allocator* _allocator) |
| { |
| if (master != NULL) { |
| LOG(FATAL) << "Can only launch one local cluster at a time (for now)"; |
| } |
| |
| if (_allocator == NULL) { |
| // Create a default allocator. |
| Try<Allocator*> defaultAllocator = HierarchicalDRFAllocator::create(); |
| if (defaultAllocator.isError()) { |
| EXIT(1) << "Failed to create an instance of HierarchicalDRFAllocator: " |
| << defaultAllocator.error(); |
| } |
| |
| // Update caller's instance. |
| _allocator = defaultAllocator.get(); |
| |
| // Save the instance for deleting later. |
| allocator = defaultAllocator.get(); |
| } else { |
| // TODO(benh): Figure out the behavior of allocator pointer and remove the |
| // else block. |
| allocator = NULL; |
| } |
| |
| files = new Files(); |
| |
| { |
| master::Flags flags; |
| Try<Nothing> load = flags.load("MESOS_"); |
| if (load.isError()) { |
| EXIT(1) << "Failed to start a local cluster while loading " |
| << "master flags from the environment: " << load.error(); |
| } |
| |
| // Load modules. Note that this covers both, master and slave |
| // specific modules as both use the same flag (--modules). |
| if (flags.modules.isSome()) { |
| Try<Nothing> result = ModuleManager::load(flags.modules.get()); |
| if (result.isError()) { |
| EXIT(1) << "Error loading modules: " << result.error(); |
| } |
| } |
| |
| if (flags.registry == "in_memory") { |
| if (flags.registry_strict) { |
| EXIT(1) << "Cannot use '--registry_strict' when using in-memory storage" |
| << " based registry"; |
| } |
| storage = new state::InMemoryStorage(); |
| } else if (flags.registry == "replicated_log") { |
| // For local runs, we use a temporary work directory. |
| if (flags.work_dir.isNone()) { |
| CHECK_SOME(os::mkdir("/tmp/mesos/local")); |
| |
| Try<string> directory = os::mkdtemp("/tmp/mesos/local/XXXXXX"); |
| CHECK_SOME(directory); |
| flags.work_dir = directory.get(); |
| } |
| |
| // TODO(vinod): Add support for replicated log with ZooKeeper. |
| log = new Log( |
| 1, |
| path::join(flags.work_dir.get(), "replicated_log"), |
| set<UPID>(), |
| flags.log_auto_initialize); |
| storage = new state::LogStorage(log); |
| } else { |
| EXIT(1) << "'" << flags.registry << "' is not a supported" |
| << " option for registry persistence"; |
| } |
| |
| CHECK_NOTNULL(storage); |
| |
| state = new state::protobuf::State(storage); |
| registrar = new Registrar(flags, state); |
| repairer = new Repairer(); |
| |
| contender = new StandaloneMasterContender(); |
| detector = new StandaloneMasterDetector(); |
| |
| auto authorizerNames = strings::split(flags.authorizers, ","); |
| if (authorizerNames.empty()) { |
| EXIT(EXIT_FAILURE) << "No authorizer specified"; |
| } |
| if (authorizerNames.size() > 1) { |
| EXIT(EXIT_FAILURE) << "Multiple authorizers not supported"; |
| } |
| std::string authorizerName = authorizerNames[0]; |
| |
| // NOTE: The flag --authorizers overrides the flag --acls, i.e. if |
| // a non default authorizer is requested, it will be used and |
| // the contents of --acls will be ignored. |
| // TODO(arojas): Add support for multiple authorizers. |
| if (authorizerName != master::DEFAULT_AUTHORIZER || |
| flags.acls.isSome()) { |
| Try<Authorizer*> create = Authorizer::create(authorizerName); |
| |
| if (create.isError()) { |
| EXIT(EXIT_FAILURE) << "Could not create '" << authorizerName |
| << "' authorizer: " << create.error(); |
| } |
| |
| authorizer = create.get(); |
| |
| LOG(INFO) << "Using '" << authorizerName << "' authorizer"; |
| |
| if (authorizerName == master::DEFAULT_AUTHORIZER) { |
| Try<Nothing> initialize = |
| authorizer.get()->initialize(flags.acls.get()); |
| |
| if (initialize.isError()) { |
| // Failing to initialize the authorizer leads to undefined |
| // behavior, therefore we default to skip authorization |
| // altogether. |
| EXIT(EXIT_FAILURE) << "Failed to initialize '" |
| << authorizerName << "' authorizer: " |
| << initialize.error(); |
| |
| delete authorizer.get(); |
| authorizer = None(); |
| } |
| } else if (flags.acls.isSome()) { |
| LOG(WARNING) << "Ignoring contents of --acls flag, because '" |
| << authorizerName << "' authorizer will be used instead" |
| << " of the default."; |
| } |
| } |
| |
| Option<shared_ptr<RateLimiter>> slaveRemovalLimiter = None(); |
| if (flags.slave_removal_rate_limit.isSome()) { |
| // Parse the flag value. |
| // TODO(vinod): Move this parsing logic to flags once we have a |
| // 'Rate' abstraction in stout. |
| vector<string> tokens = |
| strings::tokenize(flags.slave_removal_rate_limit.get(), "/"); |
| |
| if (tokens.size() != 2) { |
| EXIT(1) << "Invalid slave_removal_rate_limit: " |
| << flags.slave_removal_rate_limit.get() |
| << ". Format is <Number of slaves>/<Duration>"; |
| } |
| |
| Try<int> permits = numify<int>(tokens[0]); |
| if (permits.isError()) { |
| EXIT(1) << "Invalid slave_removal_rate_limit: " |
| << flags.slave_removal_rate_limit.get() |
| << ". Format is <Number of slaves>/<Duration>" |
| << ": " << permits.error(); |
| } |
| |
| Try<Duration> duration = Duration::parse(tokens[1]); |
| if (duration.isError()) { |
| EXIT(1) << "Invalid slave_removal_rate_limit: " |
| << flags.slave_removal_rate_limit.get() |
| << ". Format is <Number of slaves>/<Duration>" |
| << ": " << duration.error(); |
| } |
| |
| slaveRemovalLimiter = new RateLimiter(permits.get(), duration.get()); |
| } |
| |
| // Create anonymous modules. |
| foreach (const string& name, ModuleManager::find<Anonymous>()) { |
| Try<Anonymous*> create = ModuleManager::create<Anonymous>(name); |
| if (create.isError()) { |
| EXIT(1) << "Failed to create anonymous module named '" << name << "'"; |
| } |
| |
| // We don't bother keeping around the pointer to this anonymous |
| // module, when we exit that will effectively free it's memory. |
| // |
| // TODO(benh): We might want to add explicit finalization (and |
| // maybe explicit initialization too) in order to let the module |
| // do any housekeeping necessary when the master is cleanly |
| // terminating. |
| } |
| |
| master = new Master( |
| _allocator, |
| registrar, |
| repairer, |
| files, |
| contender, |
| detector, |
| authorizer, |
| slaveRemovalLimiter, |
| flags); |
| |
| detector->appoint(master->info()); |
| } |
| |
| PID<Master> pid = process::spawn(master); |
| |
| garbageCollectors = new vector<GarbageCollector*>(); |
| statusUpdateManagers = new vector<StatusUpdateManager*>(); |
| fetchers = new vector<Fetcher*>(); |
| resourceEstimators = new vector<ResourceEstimator*>(); |
| qosControllers = new vector<QoSController*>(); |
| |
| vector<UPID> pids; |
| |
| for (int i = 0; i < flags.num_slaves; i++) { |
| slave::Flags flags; |
| Try<Nothing> load = flags.load("MESOS_"); |
| |
| if (load.isError()) { |
| EXIT(1) << "Failed to start a local cluster while loading " |
| << "slave flags from the environment: " << load.error(); |
| } |
| |
| // Use a different work directory for each slave. |
| flags.work_dir = path::join(flags.work_dir, stringify(i)); |
| |
| garbageCollectors->push_back(new GarbageCollector()); |
| statusUpdateManagers->push_back(new StatusUpdateManager(flags)); |
| fetchers->push_back(new Fetcher()); |
| |
| Try<ResourceEstimator*> resourceEstimator = |
| ResourceEstimator::create(flags.resource_estimator); |
| |
| if (resourceEstimator.isError()) { |
| EXIT(1) << "Failed to create resource estimator: " |
| << resourceEstimator.error(); |
| } |
| |
| resourceEstimators->push_back(resourceEstimator.get()); |
| |
| Try<QoSController*> qosController = |
| QoSController::create(flags.qos_controller); |
| |
| if (qosController.isError()) { |
| EXIT(1) << "Failed to create QoS Controller: " |
| << qosController.error(); |
| } |
| |
| qosControllers->push_back(qosController.get()); |
| |
| // Set default launcher to 'posix'(see MESOS-3793). |
| if (flags.launcher.isNone()) { |
| flags.launcher = "posix"; |
| } |
| |
| Try<Containerizer*> containerizer = |
| Containerizer::create(flags, true, fetchers->back()); |
| |
| if (containerizer.isError()) { |
| EXIT(1) << "Failed to create a containerizer: " << containerizer.error(); |
| } |
| |
| // NOTE: At this point detector is already initialized by the |
| // Master. |
| Slave* slave = new Slave( |
| flags, |
| detector, |
| containerizer.get(), |
| files, |
| garbageCollectors->back(), |
| statusUpdateManagers->back(), |
| resourceEstimators->back(), |
| qosControllers->back()); |
| |
| slaves[containerizer.get()] = slave; |
| |
| pids.push_back(process::spawn(slave)); |
| } |
| |
| return pid; |
| } |
| |
| |
| void shutdown() |
| { |
| if (master != NULL) { |
| process::terminate(master->self()); |
| process::wait(master->self()); |
| delete master; |
| delete allocator; |
| master = NULL; |
| |
| // TODO(benh): Ugh! Because the isolator calls back into the slave |
| // (not the best design) we can't delete the slave until we have |
| // deleted the isolator. But since the slave calls into the |
| // isolator, we can't delete the isolator until we have stopped |
| // the slave. |
| |
| foreachpair (Containerizer* containerizer, Slave* slave, slaves) { |
| process::terminate(slave->self()); |
| process::wait(slave->self()); |
| delete containerizer; |
| delete slave; |
| } |
| |
| slaves.clear(); |
| |
| if (authorizer.isSome()) { |
| delete authorizer.get(); |
| authorizer = None(); |
| } |
| |
| delete detector; |
| detector = NULL; |
| |
| delete contender; |
| contender = NULL; |
| |
| delete files; |
| files = NULL; |
| |
| foreach (GarbageCollector* gc, *garbageCollectors) { |
| delete gc; |
| } |
| |
| delete garbageCollectors; |
| garbageCollectors = NULL; |
| |
| foreach (StatusUpdateManager* statusUpdateManager, *statusUpdateManagers) { |
| delete statusUpdateManager; |
| } |
| |
| delete statusUpdateManagers; |
| statusUpdateManagers = NULL; |
| |
| foreach (Fetcher* fetcher, *fetchers) { |
| delete fetcher; |
| } |
| |
| delete fetchers; |
| fetchers = NULL; |
| |
| foreach (ResourceEstimator* estimator, *resourceEstimators) { |
| delete estimator; |
| } |
| |
| delete resourceEstimators; |
| resourceEstimators = NULL; |
| |
| foreach (QoSController* controller, *qosControllers) { |
| delete controller; |
| } |
| |
| delete qosControllers; |
| qosControllers = NULL; |
| |
| delete registrar; |
| registrar = NULL; |
| |
| delete repairer; |
| repairer = NULL; |
| |
| delete state; |
| state = NULL; |
| |
| delete storage; |
| storage = NULL; |
| |
| delete log; |
| log = NULL; |
| } |
| } |
| |
| } // namespace local { |
| } // namespace internal { |
| } // namespace mesos { |