blob: 582d4a10444831b0753d20650698e5d3b51cca9f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <map>
#include <memory>
#include <set>
#include <sstream>
#include <string>
#include <vector>
#include <mesos/authorizer/authorizer.hpp>
#include <mesos/master/allocator.hpp>
#include <mesos/module/anonymous.hpp>
#include <mesos/module/authorizer.hpp>
#include <mesos/slave/resource_estimator.hpp>
#include <process/limiter.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <stout/duration.hpp>
#include <stout/exit.hpp>
#include <stout/foreach.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/try.hpp>
#include <stout/strings.hpp>
#include "common/protobuf_utils.hpp"
#include "local.hpp"
#include "logging/flags.hpp"
#include "logging/logging.hpp"
#include "master/contender.hpp"
#include "master/detector.hpp"
#include "master/master.hpp"
#include "master/registrar.hpp"
#include "master/repairer.hpp"
#include "master/allocator/mesos/hierarchical.hpp"
#include "master/allocator/sorter/drf/sorter.hpp"
#include "module/manager.hpp"
#include "slave/gc.hpp"
#include "slave/slave.hpp"
#include "slave/status_update_manager.hpp"
#include "slave/containerizer/containerizer.hpp"
#include "slave/containerizer/fetcher.hpp"
#include "state/in_memory.hpp"
#include "state/log.hpp"
#include "state/protobuf.hpp"
#include "state/storage.hpp"
using namespace mesos::internal;
using namespace mesos::internal::log;
using mesos::master::allocator::Allocator;
using mesos::internal::master::allocator::HierarchicalDRFAllocator;
using mesos::internal::master::Master;
using mesos::internal::master::Registrar;
using mesos::internal::master::Repairer;
using mesos::internal::slave::Containerizer;
using mesos::internal::slave::Fetcher;
using mesos::internal::slave::GarbageCollector;
using mesos::internal::slave::Slave;
using mesos::internal::slave::StatusUpdateManager;
using mesos::modules::Anonymous;
using mesos::modules::ModuleManager;
using mesos::slave::QoSController;
using mesos::slave::ResourceEstimator;
using process::Owned;
using process::PID;
using process::RateLimiter;
using process::UPID;
using std::map;
using std::set;
using std::shared_ptr;
using std::string;
using std::stringstream;
using std::vector;
namespace mesos {
namespace internal {
namespace local {
static Allocator* allocator = NULL;
static Log* log = NULL;
static state::Storage* storage = NULL;
static state::protobuf::State* state = NULL;
static Registrar* registrar = NULL;
static Repairer* repairer = NULL;
static Master* master = NULL;
static map<Containerizer*, Slave*> slaves;
static StandaloneMasterDetector* detector = NULL;
static MasterContender* contender = NULL;
static Option<Authorizer*> authorizer = None();
static Files* files = NULL;
static vector<GarbageCollector*>* garbageCollectors = NULL;
static vector<StatusUpdateManager*>* statusUpdateManagers = NULL;
static vector<Fetcher*>* fetchers = NULL;
static vector<ResourceEstimator*>* resourceEstimators = NULL;
static vector<QoSController*>* qosControllers = NULL;
PID<Master> launch(const Flags& flags, Allocator* _allocator)
{
if (master != NULL) {
LOG(FATAL) << "Can only launch one local cluster at a time (for now)";
}
if (_allocator == NULL) {
// Create a default allocator.
Try<Allocator*> defaultAllocator = HierarchicalDRFAllocator::create();
if (defaultAllocator.isError()) {
EXIT(1) << "Failed to create an instance of HierarchicalDRFAllocator: "
<< defaultAllocator.error();
}
// Update caller's instance.
_allocator = defaultAllocator.get();
// Save the instance for deleting later.
allocator = defaultAllocator.get();
} else {
// TODO(benh): Figure out the behavior of allocator pointer and remove the
// else block.
allocator = NULL;
}
files = new Files();
{
master::Flags flags;
Try<Nothing> load = flags.load("MESOS_");
if (load.isError()) {
EXIT(1) << "Failed to start a local cluster while loading "
<< "master flags from the environment: " << load.error();
}
// Load modules. Note that this covers both, master and slave
// specific modules as both use the same flag (--modules).
if (flags.modules.isSome()) {
Try<Nothing> result = ModuleManager::load(flags.modules.get());
if (result.isError()) {
EXIT(1) << "Error loading modules: " << result.error();
}
}
if (flags.registry == "in_memory") {
if (flags.registry_strict) {
EXIT(1) << "Cannot use '--registry_strict' when using in-memory storage"
<< " based registry";
}
storage = new state::InMemoryStorage();
} else if (flags.registry == "replicated_log") {
// For local runs, we use a temporary work directory.
if (flags.work_dir.isNone()) {
CHECK_SOME(os::mkdir("/tmp/mesos/local"));
Try<string> directory = os::mkdtemp("/tmp/mesos/local/XXXXXX");
CHECK_SOME(directory);
flags.work_dir = directory.get();
}
// TODO(vinod): Add support for replicated log with ZooKeeper.
log = new Log(
1,
path::join(flags.work_dir.get(), "replicated_log"),
set<UPID>(),
flags.log_auto_initialize);
storage = new state::LogStorage(log);
} else {
EXIT(1) << "'" << flags.registry << "' is not a supported"
<< " option for registry persistence";
}
CHECK_NOTNULL(storage);
state = new state::protobuf::State(storage);
registrar = new Registrar(flags, state);
repairer = new Repairer();
contender = new StandaloneMasterContender();
detector = new StandaloneMasterDetector();
auto authorizerNames = strings::split(flags.authorizers, ",");
if (authorizerNames.empty()) {
EXIT(EXIT_FAILURE) << "No authorizer specified";
}
if (authorizerNames.size() > 1) {
EXIT(EXIT_FAILURE) << "Multiple authorizers not supported";
}
std::string authorizerName = authorizerNames[0];
// NOTE: The flag --authorizers overrides the flag --acls, i.e. if
// a non default authorizer is requested, it will be used and
// the contents of --acls will be ignored.
// TODO(arojas): Add support for multiple authorizers.
if (authorizerName != master::DEFAULT_AUTHORIZER ||
flags.acls.isSome()) {
Try<Authorizer*> create = Authorizer::create(authorizerName);
if (create.isError()) {
EXIT(EXIT_FAILURE) << "Could not create '" << authorizerName
<< "' authorizer: " << create.error();
}
authorizer = create.get();
LOG(INFO) << "Using '" << authorizerName << "' authorizer";
if (authorizerName == master::DEFAULT_AUTHORIZER) {
Try<Nothing> initialize =
authorizer.get()->initialize(flags.acls.get());
if (initialize.isError()) {
// Failing to initialize the authorizer leads to undefined
// behavior, therefore we default to skip authorization
// altogether.
EXIT(EXIT_FAILURE) << "Failed to initialize '"
<< authorizerName << "' authorizer: "
<< initialize.error();
delete authorizer.get();
authorizer = None();
}
} else if (flags.acls.isSome()) {
LOG(WARNING) << "Ignoring contents of --acls flag, because '"
<< authorizerName << "' authorizer will be used instead"
<< " of the default.";
}
}
Option<shared_ptr<RateLimiter>> slaveRemovalLimiter = None();
if (flags.slave_removal_rate_limit.isSome()) {
// Parse the flag value.
// TODO(vinod): Move this parsing logic to flags once we have a
// 'Rate' abstraction in stout.
vector<string> tokens =
strings::tokenize(flags.slave_removal_rate_limit.get(), "/");
if (tokens.size() != 2) {
EXIT(1) << "Invalid slave_removal_rate_limit: "
<< flags.slave_removal_rate_limit.get()
<< ". Format is <Number of slaves>/<Duration>";
}
Try<int> permits = numify<int>(tokens[0]);
if (permits.isError()) {
EXIT(1) << "Invalid slave_removal_rate_limit: "
<< flags.slave_removal_rate_limit.get()
<< ". Format is <Number of slaves>/<Duration>"
<< ": " << permits.error();
}
Try<Duration> duration = Duration::parse(tokens[1]);
if (duration.isError()) {
EXIT(1) << "Invalid slave_removal_rate_limit: "
<< flags.slave_removal_rate_limit.get()
<< ". Format is <Number of slaves>/<Duration>"
<< ": " << duration.error();
}
slaveRemovalLimiter = new RateLimiter(permits.get(), duration.get());
}
// Create anonymous modules.
foreach (const string& name, ModuleManager::find<Anonymous>()) {
Try<Anonymous*> create = ModuleManager::create<Anonymous>(name);
if (create.isError()) {
EXIT(1) << "Failed to create anonymous module named '" << name << "'";
}
// We don't bother keeping around the pointer to this anonymous
// module, when we exit that will effectively free it's memory.
//
// TODO(benh): We might want to add explicit finalization (and
// maybe explicit initialization too) in order to let the module
// do any housekeeping necessary when the master is cleanly
// terminating.
}
master = new Master(
_allocator,
registrar,
repairer,
files,
contender,
detector,
authorizer,
slaveRemovalLimiter,
flags);
detector->appoint(master->info());
}
PID<Master> pid = process::spawn(master);
garbageCollectors = new vector<GarbageCollector*>();
statusUpdateManagers = new vector<StatusUpdateManager*>();
fetchers = new vector<Fetcher*>();
resourceEstimators = new vector<ResourceEstimator*>();
qosControllers = new vector<QoSController*>();
vector<UPID> pids;
for (int i = 0; i < flags.num_slaves; i++) {
slave::Flags flags;
Try<Nothing> load = flags.load("MESOS_");
if (load.isError()) {
EXIT(1) << "Failed to start a local cluster while loading "
<< "slave flags from the environment: " << load.error();
}
// Use a different work directory for each slave.
flags.work_dir = path::join(flags.work_dir, stringify(i));
garbageCollectors->push_back(new GarbageCollector());
statusUpdateManagers->push_back(new StatusUpdateManager(flags));
fetchers->push_back(new Fetcher());
Try<ResourceEstimator*> resourceEstimator =
ResourceEstimator::create(flags.resource_estimator);
if (resourceEstimator.isError()) {
EXIT(1) << "Failed to create resource estimator: "
<< resourceEstimator.error();
}
resourceEstimators->push_back(resourceEstimator.get());
Try<QoSController*> qosController =
QoSController::create(flags.qos_controller);
if (qosController.isError()) {
EXIT(1) << "Failed to create QoS Controller: "
<< qosController.error();
}
qosControllers->push_back(qosController.get());
// Set default launcher to 'posix'(see MESOS-3793).
if (flags.launcher.isNone()) {
flags.launcher = "posix";
}
Try<Containerizer*> containerizer =
Containerizer::create(flags, true, fetchers->back());
if (containerizer.isError()) {
EXIT(1) << "Failed to create a containerizer: " << containerizer.error();
}
// NOTE: At this point detector is already initialized by the
// Master.
Slave* slave = new Slave(
flags,
detector,
containerizer.get(),
files,
garbageCollectors->back(),
statusUpdateManagers->back(),
resourceEstimators->back(),
qosControllers->back());
slaves[containerizer.get()] = slave;
pids.push_back(process::spawn(slave));
}
return pid;
}
void shutdown()
{
if (master != NULL) {
process::terminate(master->self());
process::wait(master->self());
delete master;
delete allocator;
master = NULL;
// TODO(benh): Ugh! Because the isolator calls back into the slave
// (not the best design) we can't delete the slave until we have
// deleted the isolator. But since the slave calls into the
// isolator, we can't delete the isolator until we have stopped
// the slave.
foreachpair (Containerizer* containerizer, Slave* slave, slaves) {
process::terminate(slave->self());
process::wait(slave->self());
delete containerizer;
delete slave;
}
slaves.clear();
if (authorizer.isSome()) {
delete authorizer.get();
authorizer = None();
}
delete detector;
detector = NULL;
delete contender;
contender = NULL;
delete files;
files = NULL;
foreach (GarbageCollector* gc, *garbageCollectors) {
delete gc;
}
delete garbageCollectors;
garbageCollectors = NULL;
foreach (StatusUpdateManager* statusUpdateManager, *statusUpdateManagers) {
delete statusUpdateManager;
}
delete statusUpdateManagers;
statusUpdateManagers = NULL;
foreach (Fetcher* fetcher, *fetchers) {
delete fetcher;
}
delete fetchers;
fetchers = NULL;
foreach (ResourceEstimator* estimator, *resourceEstimators) {
delete estimator;
}
delete resourceEstimators;
resourceEstimators = NULL;
foreach (QoSController* controller, *qosControllers) {
delete controller;
}
delete qosControllers;
qosControllers = NULL;
delete registrar;
registrar = NULL;
delete repairer;
repairer = NULL;
delete state;
state = NULL;
delete storage;
storage = NULL;
delete log;
log = NULL;
}
}
} // namespace local {
} // namespace internal {
} // namespace mesos {