blob: 3e671f6bc9367a88303ad83394941d3c21a1945b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <map>
#include <memory>
#include <set>
#include <sstream>
#include <string>
#include <vector>
#include <mesos/authorizer/authorizer.hpp>
#include <mesos/allocator/allocator.hpp>
#include <mesos/module/anonymous.hpp>
#include <mesos/module/authorizer.hpp>
#include <mesos/module/contender.hpp>
#include <mesos/module/detector.hpp>
#include <mesos/module/secret_resolver.hpp>
#include <mesos/secret/resolver.hpp>
#include <mesos/slave/resource_estimator.hpp>
#include <mesos/state/in_memory.hpp>
#ifndef __WINDOWS__
#include <mesos/state/log.hpp>
#endif // __WINDOWS__
#include <mesos/state/protobuf.hpp>
#include <mesos/state/storage.hpp>
#include <process/limiter.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <stout/duration.hpp>
#include <stout/exit.hpp>
#include <stout/foreach.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/try.hpp>
#include <stout/strings.hpp>
#include "common/protobuf_utils.hpp"
#include "local.hpp"
#include "logging/flags.hpp"
#include "logging/logging.hpp"
#include "master/master.hpp"
#include "master/registrar.hpp"
#include "master/allocator/mesos/hierarchical.hpp"
#include "master/allocator/sorter/drf/sorter.hpp"
#include "master/contender/standalone.hpp"
#include "master/detector/standalone.hpp"
#include "module/manager.hpp"
#include "slave/gc.hpp"
#include "slave/slave.hpp"
#include "slave/status_update_manager.hpp"
#include "slave/containerizer/containerizer.hpp"
#include "slave/containerizer/fetcher.hpp"
using namespace mesos::internal;
#ifndef __WINDOWS__
using namespace mesos::internal::log;
using mesos::log::Log;
#endif // __WINDOWS__
using mesos::allocator::Allocator;
using mesos::master::contender::MasterContender;
using mesos::master::contender::StandaloneMasterContender;
using mesos::master::detector::MasterDetector;
using mesos::master::detector::StandaloneMasterDetector;
using mesos::internal::master::allocator::HierarchicalDRFAllocator;
using mesos::internal::master::Master;
using mesos::internal::master::Registrar;
using mesos::internal::slave::Containerizer;
using mesos::internal::slave::Fetcher;
using mesos::internal::slave::GarbageCollector;
using mesos::internal::slave::Slave;
using mesos::internal::slave::StatusUpdateManager;
using mesos::modules::Anonymous;
using mesos::modules::ModuleManager;
using mesos::slave::QoSController;
using mesos::slave::ResourceEstimator;
using process::Owned;
using process::PID;
using process::RateLimiter;
using process::UPID;
using std::map;
using std::set;
using std::shared_ptr;
using std::string;
using std::stringstream;
using std::vector;
namespace mesos {
namespace internal {
namespace local {
static Allocator* allocator = nullptr;
#ifndef __WINDOWS__
static Log* log = nullptr;
#endif // __WINDOWS__
static mesos::state::Storage* storage = nullptr;
static mesos::state::protobuf::State* state = nullptr;
static Registrar* registrar = nullptr;
static Master* master = nullptr;
static map<Containerizer*, Slave*> slaves;
static StandaloneMasterDetector* detector = nullptr;
static MasterContender* contender = nullptr;
static Option<Authorizer*> authorizer_ = None();
static Files* files = nullptr;
static vector<GarbageCollector*>* garbageCollectors = nullptr;
static vector<StatusUpdateManager*>* statusUpdateManagers = nullptr;
static vector<Fetcher*>* fetchers = nullptr;
static vector<ResourceEstimator*>* resourceEstimators = nullptr;
static vector<QoSController*>* qosControllers = nullptr;
PID<Master> launch(const Flags& flags, Allocator* _allocator)
{
if (master != nullptr) {
LOG(FATAL) << "Can only launch one local cluster at a time (for now)";
}
if (_allocator == nullptr) {
// Create a default allocator.
Try<Allocator*> defaultAllocator = HierarchicalDRFAllocator::create();
if (defaultAllocator.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to create an instance of HierarchicalDRFAllocator: "
<< defaultAllocator.error();
}
// Update caller's instance.
_allocator = defaultAllocator.get();
// Save the instance for deleting later.
allocator = defaultAllocator.get();
} else {
// TODO(benh): Figure out the behavior of allocator pointer and
// remove the else block.
allocator = nullptr;
}
files = new Files();
{
// Use to propagate necessary flags to master. Without this, the
// load call will spit out an error unless their corresponding
// environment variables explicitly set.
map<string, string> propagatedFlags;
propagatedFlags["work_dir"] = path::join(flags.work_dir, "master");
master::Flags masterFlags;
Try<flags::Warnings> load = masterFlags.load(
propagatedFlags,
false,
"MESOS_");
if (load.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to start a local cluster while loading"
<< " master flags from the environment: " << load.error();
}
// Log any flag warnings.
foreach (const flags::Warning& warning, load->warnings) {
LOG(WARNING) << warning.message;
}
// Attempt to create the `work_dir` for master as a convenience.
Try<Nothing> mkdir = os::mkdir(masterFlags.work_dir.get());
if (mkdir.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to create the work_dir for master '"
<< masterFlags.work_dir.get() << "': " << mkdir.error();
}
// Load modules. Note that this covers both, master and slave
// specific modules as both use the same flag (--modules).
if (masterFlags.modules.isSome()) {
Try<Nothing> result = ModuleManager::load(masterFlags.modules.get());
if (result.isError()) {
EXIT(EXIT_FAILURE) << "Error loading modules: " << result.error();
}
}
if (masterFlags.registry == "in_memory") {
storage = new mesos::state::InMemoryStorage();
#ifndef __WINDOWS__
} else if (masterFlags.registry == "replicated_log") {
// TODO(vinod): Add support for replicated log with ZooKeeper.
log = new Log(
1,
path::join(masterFlags.work_dir.get(), "replicated_log"),
set<UPID>(),
masterFlags.log_auto_initialize,
"registrar/");
storage = new mesos::state::LogStorage(log);
#endif // __WINDOWS__
} else {
EXIT(EXIT_FAILURE)
<< "'" << masterFlags.registry << "' is not a supported"
<< " option for registry persistence";
}
CHECK_NOTNULL(storage);
state = new mesos::state::protobuf::State(storage);
registrar = new Registrar(
masterFlags,
state,
master::READONLY_HTTP_AUTHENTICATION_REALM);
contender = new StandaloneMasterContender();
detector = new StandaloneMasterDetector();
auto authorizerNames = strings::split(masterFlags.authorizers, ",");
if (authorizerNames.empty()) {
EXIT(EXIT_FAILURE) << "No authorizer specified";
}
if (authorizerNames.size() > 1) {
EXIT(EXIT_FAILURE) << "Multiple authorizers not supported";
}
string authorizerName = authorizerNames[0];
// NOTE: The flag --authorizers overrides the flag --acls, i.e. if
// a non default authorizer is requested, it will be used and
// the contents of --acls will be ignored.
// TODO(arojas): Consider adding support for multiple authorizers.
Result<Authorizer*> authorizer((None()));
if (authorizerName != master::DEFAULT_AUTHORIZER) {
LOG(INFO) << "Creating '" << authorizerName << "' authorizer";
authorizer = Authorizer::create(authorizerName);
} else {
// `authorizerName` is `DEFAULT_AUTHORIZER` at this point.
if (masterFlags.acls.isSome()) {
LOG(INFO) << "Creating default '" << authorizerName << "' authorizer";
authorizer = Authorizer::create(masterFlags.acls.get());
}
}
if (authorizer.isError()) {
EXIT(EXIT_FAILURE) << "Could not create '" << authorizerName
<< "' authorizer: " << authorizer.error();
} else if (authorizer.isSome()) {
authorizer_ = authorizer.get();
}
Option<shared_ptr<RateLimiter>> slaveRemovalLimiter = None();
if (masterFlags.agent_removal_rate_limit.isSome()) {
// Parse the flag value.
// TODO(vinod): Move this parsing logic to flags once we have a
// 'Rate' abstraction in stout.
vector<string> tokens =
strings::tokenize(masterFlags.agent_removal_rate_limit.get(), "/");
if (tokens.size() != 2) {
EXIT(EXIT_FAILURE)
<< "Invalid agent_removal_rate_limit: "
<< masterFlags.agent_removal_rate_limit.get()
<< ". Format is <Number of agents>/<Duration>";
}
Try<int> permits = numify<int>(tokens[0]);
if (permits.isError()) {
EXIT(EXIT_FAILURE)
<< "Invalid agent_removal_rate_limit: "
<< masterFlags.agent_removal_rate_limit.get()
<< ". Format is <Number of agents>/<Duration>"
<< ": " << permits.error();
}
Try<Duration> duration = Duration::parse(tokens[1]);
if (duration.isError()) {
EXIT(EXIT_FAILURE)
<< "Invalid agent_removal_rate_limit: "
<< masterFlags.agent_removal_rate_limit.get()
<< ". Format is <Number of agents>/<Duration>"
<< ": " << duration.error();
}
slaveRemovalLimiter = new RateLimiter(permits.get(), duration.get());
}
// Create anonymous modules.
foreach (const string& name, ModuleManager::find<Anonymous>()) {
Try<Anonymous*> create = ModuleManager::create<Anonymous>(name);
if (create.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to create anonymous module named '" << name << "'";
}
// We don't bother keeping around the pointer to this anonymous
// module, when we exit that will effectively free its memory.
//
// TODO(benh): We might want to add explicit finalization (and
// maybe explicit initialization too) in order to let the module
// do any housekeeping necessary when the master is cleanly
// terminating.
}
master = new Master(
_allocator,
registrar,
files,
contender,
detector,
authorizer_,
slaveRemovalLimiter,
masterFlags);
detector->appoint(master->info());
}
PID<Master> pid = process::spawn(master);
garbageCollectors = new vector<GarbageCollector*>();
statusUpdateManagers = new vector<StatusUpdateManager*>();
fetchers = new vector<Fetcher*>();
resourceEstimators = new vector<ResourceEstimator*>();
qosControllers = new vector<QoSController*>();
vector<UPID> pids;
for (int i = 0; i < flags.num_slaves; i++) {
// Use to propagate necessary flags to agent. Without this, the
// load call will spit out an error unless their corresponding
// environment variables explicitly set.
map<string, string> propagatedFlags;
// Use a different work/runtime/fetcher-cache directory for each agent.
propagatedFlags["work_dir"] =
path::join(flags.work_dir, "agents", stringify(i), "work");
propagatedFlags["runtime_dir"] =
path::join(flags.work_dir, "agents", stringify(i), "run");
propagatedFlags["fetcher_cache_dir"] =
path::join(flags.work_dir, "agents", stringify(i), "fetch");
slave::Flags slaveFlags;
Try<flags::Warnings> load = slaveFlags.load(
propagatedFlags,
false,
"MESOS_");
if (load.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to start a local cluster while loading"
<< " agent flags from the environment: " << load.error();
}
// Log any flag warnings (after logging is initialized).
foreach (const flags::Warning& warning, load->warnings) {
LOG(WARNING) << warning.message;
}
// Attempt to create the `work_dir` for agent as a convenience.
Try<Nothing> mkdir = os::mkdir(slaveFlags.work_dir);
if (mkdir.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to create the work_dir for agent '"
<< slaveFlags.work_dir << "': " << mkdir.error();
}
// Attempt to create the `runtime_dir` for agent as a convenience.
mkdir = os::mkdir(slaveFlags.runtime_dir);
if (mkdir.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to create the runtime_dir for agent '"
<< slaveFlags.runtime_dir << "': " << mkdir.error();
}
garbageCollectors->push_back(new GarbageCollector(slaveFlags.work_dir));
statusUpdateManagers->push_back(new StatusUpdateManager(slaveFlags));
fetchers->push_back(new Fetcher(slaveFlags));
Try<ResourceEstimator*> resourceEstimator =
ResourceEstimator::create(slaveFlags.resource_estimator);
if (resourceEstimator.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to create resource estimator: " << resourceEstimator.error();
}
resourceEstimators->push_back(resourceEstimator.get());
Try<QoSController*> qosController =
QoSController::create(slaveFlags.qos_controller);
if (qosController.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to create QoS Controller: " << qosController.error();
}
qosControllers->push_back(qosController.get());
// Override the default launcher that gets created per agent to
// 'posix' if we're creating multiple agents because the
// LinuxLauncher does not support multiple agents on the same host
// (see MESOS-3793).
if (flags.num_slaves > 1 && slaveFlags.launcher != "posix") {
// TODO(benh): This log line will print out for each slave!
LOG(WARNING) << "Using the 'posix' launcher instead of '"
<< slaveFlags.launcher << "' since currently only the "
<< "'posix' launcher supports multiple agents per host";
slaveFlags.launcher = "posix";
}
// Initialize SecretResolver.
Try<SecretResolver*> secretResolver =
mesos::SecretResolver::create(slaveFlags.secret_resolver);
if (secretResolver.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to initialize secret resolver: " << secretResolver.error();
}
Try<Containerizer*> containerizer = Containerizer::create(
slaveFlags,
true,
fetchers->back(),
secretResolver.get());
if (containerizer.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to create a containerizer: " << containerizer.error();
}
// NOTE: At this point detector is already initialized by the
// Master.
Slave* slave = new Slave(
process::ID::generate("slave"),
slaveFlags,
detector,
containerizer.get(),
files,
garbageCollectors->back(),
statusUpdateManagers->back(),
resourceEstimators->back(),
qosControllers->back(),
authorizer_); // Same authorizer as master.
slaves[containerizer.get()] = slave;
pids.push_back(process::spawn(slave));
}
return pid;
}
void shutdown()
{
if (master != nullptr) {
process::terminate(master->self());
process::wait(master->self());
delete master;
delete allocator;
master = nullptr;
// TODO(benh): Ugh! Because the isolator calls back into the slave
// (not the best design) we can't delete the slave until we have
// deleted the isolator. But since the slave calls into the
// isolator, we can't delete the isolator until we have stopped
// the slave.
foreachpair (Containerizer* containerizer, Slave* slave, slaves) {
process::terminate(slave->self());
process::wait(slave->self());
delete containerizer;
delete slave;
}
slaves.clear();
if (authorizer_.isSome()) {
delete authorizer_.get();
authorizer_ = None();
}
delete detector;
detector = nullptr;
delete contender;
contender = nullptr;
delete files;
files = nullptr;
foreach (GarbageCollector* gc, *garbageCollectors) {
delete gc;
}
delete garbageCollectors;
garbageCollectors = nullptr;
foreach (StatusUpdateManager* statusUpdateManager, *statusUpdateManagers) {
delete statusUpdateManager;
}
delete statusUpdateManagers;
statusUpdateManagers = nullptr;
foreach (Fetcher* fetcher, *fetchers) {
delete fetcher;
}
delete fetchers;
fetchers = nullptr;
foreach (ResourceEstimator* estimator, *resourceEstimators) {
delete estimator;
}
delete resourceEstimators;
resourceEstimators = nullptr;
foreach (QoSController* controller, *qosControllers) {
delete controller;
}
delete qosControllers;
qosControllers = nullptr;
delete registrar;
registrar = nullptr;
delete state;
state = nullptr;
delete storage;
storage = nullptr;
#ifndef __WINDOWS__
delete log;
log = nullptr;
#endif // __WINDOWS__
}
}
} // namespace local {
} // namespace internal {
} // namespace mesos {