blob: 4218b1563e10aaefe9abcdc20c90c13651959790 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <stdlib.h>
#include <sstream>
#include <vector>
#include <glog/logging.h>
#include <mesos/mesos.hpp>
#include <mesos/resources.hpp>
#include <mesos/scheduler.hpp>
#include <mesos/type_utils.hpp>
#include <mesos/authorizer/authorizer.hpp>
#include <stout/flags.hpp>
#include <stout/format.hpp>
#include <stout/json.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/protobuf.hpp>
#include <stout/stringify.hpp>
#include <stout/uuid.hpp>
#include "common/status_utils.hpp"
#include "logging/flags.hpp"
#include "logging/logging.hpp"
using namespace mesos;
using namespace mesos::internal;
using std::cerr;
using std::cout;
using std::endl;
using std::ostringstream;
using std::string;
using std::vector;
// TODO(jieyu): Currently, persistent volume is only allowed for
// reserved resources.
static Resources SHARD_INITIAL_RESOURCES(const string& role)
{
return Resources::parse("cpus:0.1;mem:32;disk:16", role).get();
}
static Resource SHARD_PERSISTENT_VOLUME(
const string& role,
const string& persistenceId,
const string& containerPath)
{
Volume volume;
volume.set_container_path(containerPath);
volume.set_mode(Volume::RW);
Resource::DiskInfo info;
info.mutable_persistence()->set_id(persistenceId);
info.mutable_volume()->CopyFrom(volume);
Resource resource = Resources::parse("disk", "8", role).get();
resource.mutable_disk()->CopyFrom(info);
return resource;
}
static Offer::Operation CREATE(const Resources& volumes)
{
Offer::Operation operation;
operation.set_type(Offer::Operation::CREATE);
operation.mutable_create()->mutable_volumes()->CopyFrom(volumes);
return operation;
}
static Offer::Operation LAUNCH(const vector<TaskInfo>& tasks)
{
Offer::Operation operation;
operation.set_type(Offer::Operation::LAUNCH);
foreach (const TaskInfo& task, tasks) {
operation.mutable_launch()->add_task_infos()->CopyFrom(task);
}
return operation;
}
// The framework launches a task on each registered slave using a
// persistent volume. It restarts the task once the previous one on
// the slave finishes. The framework terminates once the number of
// tasks launched on each slave reaches a limit.
class PersistentVolumeScheduler : public Scheduler
{
public:
PersistentVolumeScheduler(
const FrameworkInfo& _frameworkInfo,
size_t numShards,
size_t tasksPerShard)
: frameworkInfo(_frameworkInfo)
{
for (size_t i = 0; i < numShards; i++) {
shards.push_back(Shard(
"shard-" + stringify(i),
frameworkInfo.role(),
tasksPerShard));
}
}
virtual void registered(
SchedulerDriver* driver,
const FrameworkID& frameworkId,
const MasterInfo& masterInfo)
{
LOG(INFO) << "Registered with master " << masterInfo
<< " and got framework ID " << frameworkId;
frameworkInfo.mutable_id()->CopyFrom(frameworkId);
}
virtual void reregistered(
SchedulerDriver* driver,
const MasterInfo& masterInfo)
{
LOG(INFO) << "Reregistered with master " << masterInfo;
}
virtual void disconnected(
SchedulerDriver* driver)
{
LOG(INFO) << "Disconnected!";
}
virtual void resourceOffers(
SchedulerDriver* driver,
const vector<Offer>& offers)
{
foreach (const Offer& offer, offers) {
LOG(INFO) << "Received offer " << offer.id() << " from slave "
<< offer.slave_id() << " (" << offer.hostname() << ") "
<< "with " << offer.resources();
Resources offered = offer.resources();
// The operation we will perform on the offer.
vector<Offer::Operation> operations;
foreach (Shard& shard, shards) {
switch (shard.state) {
case Shard::INIT:
if (offered.contains(shard.resources)) {
Resource volume = SHARD_PERSISTENT_VOLUME(
frameworkInfo.role(),
UUID::random().toString(),
"volume");
Try<Resources> resources = shard.resources.apply(CREATE(volume));
CHECK_SOME(resources);
TaskInfo task;
task.set_name(shard.name);
task.mutable_task_id()->set_value(UUID::random().toString());
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(resources.get());
task.mutable_command()->set_value("touch volume/persisted");
// Update the shard.
shard.state = Shard::STAGING;
shard.taskId = task.task_id();
shard.volume.id = volume.disk().persistence().id();
shard.volume.slave = offer.slave_id().value();
shard.resources = resources.get();
shard.launched++;
operations.push_back(CREATE(volume));
operations.push_back(LAUNCH({task}));
resources = offered.apply(vector<Offer::Operation>{
CREATE(volume),
LAUNCH({task})});
CHECK_SOME(resources);
offered = resources.get();
}
break;
case Shard::WAITING:
if (offered.contains(shard.resources)) {
CHECK_EQ(shard.volume.slave, offer.slave_id().value());
TaskInfo task;
task.set_name(shard.name);
task.mutable_task_id()->set_value(UUID::random().toString());
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(shard.resources);
task.mutable_command()->set_value("test -f volume/persisted");
// Update the shard.
shard.state = Shard::STAGING;
shard.taskId = task.task_id();
shard.launched++;
operations.push_back(LAUNCH({task}));
}
break;
case Shard::STAGING:
case Shard::RUNNING:
case Shard::DONE:
// Ignore the offer.
break;
default:
LOG(ERROR) << "Unexpected shard state: " << shard.state;
driver->abort();
break;
}
}
driver->acceptOffers({offer.id()}, operations);
}
}
virtual void offerRescinded(
SchedulerDriver* driver,
const OfferID& offerId)
{
LOG(INFO) << "Offer " << offerId << " has been rescinded";
}
virtual void statusUpdate(
SchedulerDriver* driver,
const TaskStatus& status)
{
LOG(INFO) << "Task '" << status.task_id() << "' is in state "
<< status.state();
foreach (Shard& shard, shards) {
if (shard.taskId == status.task_id()) {
switch (status.state()) {
case TASK_RUNNING:
shard.state = Shard::RUNNING;
break;
case TASK_FINISHED:
if (shard.launched >= shard.tasks) {
shard.state = Shard::DONE;
} else {
shard.state = Shard::WAITING;
}
break;
case TASK_STAGING:
case TASK_STARTING:
// Ignore the status update.
break;
default:
LOG(ERROR) << "Unexpected task state " << status.state()
<< " for task '" << status.task_id() << "'";
driver->abort();
break;
}
break;
}
}
// Check the terminal condition.
bool terminal = true;
foreach (const Shard& shard, shards) {
if (shard.state != Shard::DONE) {
terminal = false;
break;
}
}
if (terminal) {
driver->stop();
}
}
virtual void frameworkMessage(
SchedulerDriver* driver,
const ExecutorID& executorId,
const SlaveID& slaveId,
const string& data)
{
LOG(INFO) << "Received framework message from executor '" << executorId
<< "' on slave " << slaveId << ": '" << data << "'";
}
virtual void slaveLost(
SchedulerDriver* driver,
const SlaveID& slaveId)
{
LOG(INFO) << "Lost slave " << slaveId;
}
virtual void executorLost(
SchedulerDriver* driver,
const ExecutorID& executorId,
const SlaveID& slaveId,
int status)
{
LOG(INFO) << "Lost executor '" << executorId << "' on slave "
<< slaveId << ", " << WSTRINGIFY(status);
}
virtual void error(
SchedulerDriver* driver,
const string& message)
{
LOG(ERROR) << message;
}
private:
struct Shard
{
enum State
{
INIT = 0, // The shard hasn't been launched yet.
STAGING, // The shard has been launched.
RUNNING, // The shard is running.
WAITING, // The shard is waiting to be re-launched.
DONE, // The shard has finished all tasks.
// TODO(jieyu): Add another state so that we can track the
// destroy of the volume once all tasks finish.
};
// The persistent volume associated with this shard.
struct Volume
{
// The persistence ID.
string id;
// An identifier used to uniquely identify a slave (even across
// reboot). In the test, we use the slave ID since slaves will not
// be rebooted. Note that we cannot use hostname as the identifier
// in a local cluster because all slaves share the same hostname.
string slave;
};
Shard(const string& _name, const string& role, size_t _tasks)
: name(_name),
state(INIT),
resources(SHARD_INITIAL_RESOURCES(role)),
launched(0),
tasks(_tasks) {}
string name;
State state; // The current state of this shard.
TaskID taskId; // The ID of the current task.
Volume volume; // The persistent volume associated with the shard.
Resources resources; // Resources required to launch the shard.
size_t launched; // How many tasks this shard has launched.
size_t tasks; // How many tasks this shard should launch.
};
FrameworkInfo frameworkInfo;
vector<Shard> shards;
};
class Flags : public logging::Flags
{
public:
Flags()
{
add(&master,
"master",
"The master to connect to. May be one of:\n"
" master@addr:port (The PID of the master)\n"
" zk://host1:port1,host2:port2,.../path\n"
" zk://username:password@host1:port1,host2:port2,.../path\n"
" file://path/to/file (where file contains one of the above)");
add(&role,
"role",
"Role to use when registering",
"test");
add(&principal,
"principal",
"The principal used to identify this framework",
"test");
add(&num_shards,
"num_shards",
"The number of shards the framework will run.",
3);
add(&tasks_per_shard,
"tasks_per_shard",
"The number of tasks should be launched per shard.",
3);
}
Option<string> master;
string role;
string principal;
size_t num_shards;
size_t tasks_per_shard;
};
int main(int argc, char** argv)
{
Flags flags;
Try<Nothing> load = flags.load("MESOS_", argc, argv);
if (load.isError()) {
cerr << flags.usage(load.error()) << endl;
return EXIT_FAILURE;
}
if (flags.help) {
cout << flags.usage() << endl;
return EXIT_SUCCESS;
}
if (flags.master.isNone()) {
cerr << flags.usage("Missing required option --master") << endl;
return EXIT_FAILURE;
}
logging::initialize(argv[0], flags, true); // Catch signals.
FrameworkInfo framework;
framework.set_user(""); // Have Mesos fill in the current user.
framework.set_name("Persistent Volume Framework (C++)");
framework.set_role(flags.role);
framework.set_checkpoint(true);
framework.set_principal(flags.principal);
if (flags.master.get() == "local") {
// Configure master.
os::setenv("MESOS_ROLES", flags.role);
os::setenv("MESOS_AUTHENTICATE", "false");
ACLs acls;
ACL::RegisterFramework* acl = acls.add_register_frameworks();
acl->mutable_principals()->set_type(ACL::Entity::ANY);
acl->mutable_roles()->add_values(flags.role);
os::setenv("MESOS_ACLS", stringify(JSON::protobuf(acls)));
// Configure slave.
os::setenv("MESOS_DEFAULT_ROLE", flags.role);
}
PersistentVolumeScheduler scheduler(
framework,
flags.num_shards,
flags.tasks_per_shard);
MesosSchedulerDriver* driver = new MesosSchedulerDriver(
&scheduler,
framework,
flags.master.get());
int status = driver->run() == DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE;
driver->stop();
delete driver;
return status;
}