blob: 9db4dd18c1aefca24cfa66d762994ac59ed2ed3d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <map>
#include <set>
#include <sstream>
#include <vector>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <stout/exit.hpp>
#include <stout/foreach.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/try.hpp>
#include <stout/strings.hpp>
#include "authorizer/authorizer.hpp"
#include "common/protobuf_utils.hpp"
#include "local.hpp"
#include "logging/flags.hpp"
#include "logging/logging.hpp"
#include "master/allocator.hpp"
#include "master/contender.hpp"
#include "master/detector.hpp"
#include "master/drf_sorter.hpp"
#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
#include "master/registrar.hpp"
#include "master/repairer.hpp"
#include "slave/containerizer/containerizer.hpp"
#include "slave/slave.hpp"
#include "state/in_memory.hpp"
#include "state/log.hpp"
#include "state/protobuf.hpp"
#include "state/storage.hpp"
using namespace mesos::internal;
using namespace mesos::internal::log;
using mesos::internal::master::allocator::Allocator;
using mesos::internal::master::allocator::AllocatorProcess;
using mesos::internal::master::allocator::DRFSorter;
using mesos::internal::master::allocator::HierarchicalDRFAllocatorProcess;
using mesos::internal::master::Master;
using mesos::internal::master::Registrar;
using mesos::internal::master::Repairer;
using mesos::internal::slave::Containerizer;
using mesos::internal::slave::Slave;
using process::Owned;
using process::PID;
using process::UPID;
using std::map;
using std::set;
using std::string;
using std::stringstream;
using std::vector;
namespace mesos {
namespace internal {
namespace local {
static Allocator* allocator = NULL;
static AllocatorProcess* allocatorProcess = NULL;
static Log* log = NULL;
static state::Storage* storage = NULL;
static state::protobuf::State* state = NULL;
static Registrar* registrar = NULL;
static Repairer* repairer = NULL;
static Master* master = NULL;
static map<Containerizer*, Slave*> slaves;
static StandaloneMasterDetector* detector = NULL;
static MasterContender* contender = NULL;
static Option<Authorizer*> authorizer = None();
static Files* files = NULL;
PID<Master> launch(const Flags& flags, Allocator* _allocator)
{
if (master != NULL) {
LOG(FATAL) << "Can only launch one local cluster at a time (for now)";
}
if (_allocator == NULL) {
// Create default allocator, save it for deleting later.
allocatorProcess = new HierarchicalDRFAllocatorProcess();
_allocator = allocator = new Allocator(allocatorProcess);
} else {
// TODO(benh): Figure out the behavior of allocator pointer and remove the
// else block.
allocator = NULL;
allocatorProcess = NULL;
}
files = new Files();
{
master::Flags flags;
Try<Nothing> load = flags.load("MESOS_");
if (load.isError()) {
EXIT(1) << "Failed to start a local cluster while loading "
<< "master flags from the environment: " << load.error();
}
if (flags.registry == "in_memory") {
if (flags.registry_strict) {
EXIT(1) << "Cannot use '--registry_strict' when using in-memory storage"
<< " based registry";
}
storage = new state::InMemoryStorage();
} else if (flags.registry == "replicated_log") {
// For local runs, we use a temporary work directory.
if (flags.work_dir.isNone()) {
CHECK_SOME(os::mkdir("/tmp/mesos/local"));
Try<string> directory = os::mkdtemp("/tmp/mesos/local/XXXXXX");
CHECK_SOME(directory);
flags.work_dir = directory.get();
}
// TODO(vinod): Add support for replicated log with ZooKeeper.
log = new Log(
1,
path::join(flags.work_dir.get(), "replicated_log"),
set<UPID>(),
flags.log_auto_initialize);
storage = new state::LogStorage(log);
} else {
EXIT(1) << "'" << flags.registry << "' is not a supported"
<< " option for registry persistence";
}
CHECK_NOTNULL(storage);
state = new state::protobuf::State(storage);
registrar = new Registrar(flags, state);
repairer = new Repairer();
contender = new StandaloneMasterContender();
detector = new StandaloneMasterDetector();
if (flags.acls.isSome()) {
Try<Owned<Authorizer> > authorizer_ =
Authorizer::create(flags.acls.get());
if (authorizer_.isError()) {
EXIT(1) << "Failed to initialize the authorizer: "
<< authorizer_.error() << " (see --acls flag)";
}
Owned<Authorizer> authorizer__ = authorizer_.get();
authorizer = authorizer__.release();
}
master = new Master(
_allocator,
registrar,
repairer,
files,
contender,
detector,
authorizer,
flags);
detector->appoint(master->info());
}
PID<Master> pid = process::spawn(master);
vector<UPID> pids;
for (int i = 0; i < flags.num_slaves; i++) {
slave::Flags flags;
Try<Nothing> load = flags.load("MESOS_");
if (load.isError()) {
EXIT(1) << "Failed to start a local cluster while loading "
<< "slave flags from the environment: " << load.error();
}
Try<Containerizer*> containerizer = Containerizer::create(flags, true);
if (containerizer.isError()) {
EXIT(1) << "Failed to create a containerizer: " << containerizer.error();
}
// Use a different work directory for each slave.
flags.work_dir = path::join(flags.work_dir, stringify(i));
// NOTE: At this point detector is already initialized by the
// Master.
Slave* slave = new Slave(flags, detector, containerizer.get(), files);
slaves[containerizer.get()] = slave;
pids.push_back(process::spawn(slave));
}
return pid;
}
void shutdown()
{
if (master != NULL) {
process::terminate(master->self());
process::wait(master->self());
delete master;
delete allocator;
delete allocatorProcess;
master = NULL;
// TODO(benh): Ugh! Because the isolator calls back into the slave
// (not the best design) we can't delete the slave until we have
// deleted the isolator. But since the slave calls into the
// isolator, we can't delete the isolator until we have stopped
// the slave.
foreachpair (Containerizer* containerizer, Slave* slave, slaves) {
process::terminate(slave->self());
process::wait(slave->self());
delete containerizer;
delete slave;
}
slaves.clear();
if (authorizer.isSome()) {
delete authorizer.get();
authorizer = None();
}
delete detector;
detector = NULL;
delete contender;
contender = NULL;
delete files;
files = NULL;
delete registrar;
registrar = NULL;
delete repairer;
repairer = NULL;
delete state;
state = NULL;
delete storage;
storage = NULL;
delete log;
log = NULL;
}
}
} // namespace local {
} // namespace internal {
} // namespace mesos {