blob: 5aa8762f1aa7c21e93fde9b038828e6ddc2e96f5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <stdlib.h>
#include <sstream>
#include <vector>
#include <glog/logging.h>
#include <mesos/mesos.hpp>
#include <mesos/resources.hpp>
#include <mesos/scheduler.hpp>
#include <mesos/type_utils.hpp>
#include <mesos/authorizer/acls.hpp>
#include <stout/flags.hpp>
#include <stout/hashset.hpp>
#include <stout/json.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/protobuf.hpp>
#include <stout/stringify.hpp>
#include <stout/uuid.hpp>
#include "common/status_utils.hpp"
#include "examples/flags.hpp"
#include "logging/logging.hpp"
using namespace mesos;
using namespace mesos::internal;
using std::ostringstream;
using std::string;
using std::vector;
constexpr char FRAMEWORK_NAME[] = "Persistent Volume Framework (C++)";
// TODO(jieyu): Currently, persistent volume is only allowed for
// reserved resources.
static Resources SHARD_INITIAL_RESOURCES(const string& role)
{
Resources allocation =
Resources::parse("cpus:0.1;mem:32;disk:16", role).get();
allocation.allocate(role);
return allocation;
}
static Resource SHARD_PERSISTENT_VOLUME(
const string& role,
const string& persistenceId,
const string& containerPath,
const string& principal,
bool isShared)
{
Volume volume;
volume.set_container_path(containerPath);
volume.set_mode(Volume::RW);
Resource::DiskInfo info;
info.mutable_persistence()->set_id(persistenceId);
info.mutable_persistence()->set_principal(principal);
info.mutable_volume()->CopyFrom(volume);
Resource resource = Resources::parse("disk", "8", role).get();
resource.mutable_disk()->CopyFrom(info);
resource.mutable_allocation_info()->set_role(role);
if (isShared) {
resource.mutable_shared();
}
return resource;
}
static Offer::Operation CREATE(const Resources& volumes)
{
Offer::Operation operation;
operation.set_type(Offer::Operation::CREATE);
operation.mutable_create()->mutable_volumes()->CopyFrom(volumes);
return operation;
}
static Offer::Operation DESTROY(const Resources& volumes)
{
Offer::Operation operation;
operation.set_type(Offer::Operation::DESTROY);
operation.mutable_destroy()->mutable_volumes()->CopyFrom(volumes);
return operation;
}
static Offer::Operation LAUNCH(const vector<TaskInfo>& tasks)
{
Offer::Operation operation;
operation.set_type(Offer::Operation::LAUNCH);
foreach (const TaskInfo& task, tasks) {
operation.mutable_launch()->add_task_infos()->CopyFrom(task);
}
return operation;
}
// The framework launches a task on each registered agent using a
// (possibly shared) persistent volume. In the case of regular
// persistent volumes, the next task is started once the previous
// one terminates; and in the case of shared persistent volumes,
// tasks can use the same shared volume simultaneously. The
// framework terminates once the number of tasks launched on each
// agent reaches a limit.
class PersistentVolumeScheduler : public Scheduler
{
public:
PersistentVolumeScheduler(
const FrameworkInfo& _frameworkInfo,
size_t numShards,
size_t numSharedShards,
size_t tasksPerShard)
: frameworkInfo(_frameworkInfo),
role(_frameworkInfo.roles(0))
{
// Initialize the shards using regular persistent volume.
for (size_t i = 0; i < numShards; i++) {
shards.push_back(
Shard(
"shard-" + stringify(i),
role,
tasksPerShard,
false));
}
// Initialize the shards using shared persistent volume.
for (size_t i = 0; i < numSharedShards; i++) {
shards.push_back(
Shard(
"shared-shard-" + stringify(i),
role,
tasksPerShard,
true));
}
}
void registered(
SchedulerDriver* driver,
const FrameworkID& frameworkId,
const MasterInfo& masterInfo) override
{
LOG(INFO) << "Registered with master " << masterInfo
<< " and got framework ID " << frameworkId;
frameworkInfo.mutable_id()->CopyFrom(frameworkId);
}
void reregistered(
SchedulerDriver* driver,
const MasterInfo& masterInfo) override
{
LOG(INFO) << "Reregistered with master " << masterInfo;
}
void disconnected(SchedulerDriver* driver) override
{
LOG(INFO) << "Disconnected!";
}
void resourceOffers(
SchedulerDriver* driver,
const vector<Offer>& offers) override
{
foreach (const Offer& offer, offers) {
LOG(INFO) << "Received offer " << offer.id() << " from agent "
<< offer.slave_id() << " (" << offer.hostname() << ") "
<< "with " << offer.resources();
Resources offered = offer.resources();
// The operation we will perform on the offer.
vector<Offer::Operation> operations;
foreach (Shard& shard, shards) {
switch (shard.state) {
case Shard::INIT:
CHECK_EQ(0u, shard.launched);
if (offered.contains(shard.resources)) {
Resource volume = SHARD_PERSISTENT_VOLUME(
role,
id::UUID::random().toString(),
"volume",
frameworkInfo.principal(),
shard.volume.isShared);
Try<Resources> resources = shard.resources.apply(CREATE(volume));
CHECK_SOME(resources);
TaskInfo task;
task.set_name(shard.name);
task.mutable_task_id()->set_value(id::UUID::random().toString());
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(resources.get());
// TODO(anindya_sinha): Add a flag to allow specifying a
// custom write command for the consumer task.
task.mutable_command()->set_value(
"echo hello > volume/persisted");
// Update the shard.
shard.state = Shard::STAGING;
shard.taskIds.insert(task.task_id());
shard.resources = resources.get();
shard.volume.resource = volume;
shard.launched++;
operations.push_back(CREATE(volume));
operations.push_back(LAUNCH({task}));
resources = offered.apply(
vector<Offer::Operation>{CREATE(volume)});
CHECK_SOME(resources);
offered = resources.get();
}
break;
case Shard::STAGING:
CHECK_LE(shard.launched, shard.tasks);
if (shard.launched == shard.tasks) {
LOG(INFO) << "All tasks launched, but one or more yet to run";
break;
}
if (offered.contains(shard.resources)) {
// Set mode to RO for persistent volume resource since we are
// just reading from the persistent volume.
CHECK_SOME(shard.volume.resource);
Resource volume = shard.volume.resource.get();
Resources taskResources = shard.resources - volume;
volume.mutable_disk()->mutable_volume()->set_mode(Volume::RO);
taskResources += volume;
TaskInfo task;
task.set_name(shard.name);
task.mutable_task_id()->set_value(id::UUID::random().toString());
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(taskResources);
// The read task tries to access the content written in the
// persistent volume for up to 15 seconds. This is to handle
// the scenario where the writer task may not have done the
// write to the volume even though it was launched earlier.
// TODO(anindya_sinha): Add a flag to allow specifying a
// custom read command for the consumer task.
task.mutable_command()->set_value(R"~(
COUNTER=0
while [ $COUNTER -lt 15 ]; do
cat volume/persisted
if [ $? -eq 0 ]; then
exit 0
fi
((COUNTER++))
sleep 1
done
exit 1
)~");
// Update the shard.
shard.taskIds.insert(task.task_id());
shard.launched++;
// Launch the next instance of this task with the already
// created volume.
operations.push_back(LAUNCH({task}));
}
break;
case Shard::TERMINATING:
CHECK_EQ(shard.terminated, shard.launched);
CHECK_SOME(shard.volume.resource);
// Send (or resend) DESTROY of the volume here.
if (offered.contains(shard.volume.resource.get())) {
operations.push_back(DESTROY(shard.volume.resource.get()));
} else {
shard.state = Shard::DONE;
}
break;
case Shard::RUNNING:
case Shard::DONE:
// Ignore the offer.
break;
default:
LOG(ERROR) << "Unexpected shard state: " << shard.state;
driver->abort();
break;
}
}
// Check the terminal condition.
bool terminal = true;
foreach (const Shard& shard, shards) {
if (shard.state != Shard::DONE) {
terminal = false;
break;
}
}
if (terminal) {
driver->stop();
}
driver->acceptOffers({offer.id()}, operations);
}
}
void offerRescinded(SchedulerDriver* driver, const OfferID& offerId) override
{
LOG(INFO) << "Offer " << offerId << " has been rescinded";
}
void statusUpdate(SchedulerDriver* driver, const TaskStatus& status) override
{
LOG(INFO) << "Task '" << status.task_id() << "' is in state "
<< status.state();
foreach (Shard& shard, shards) {
if (shard.taskIds.contains(status.task_id())) {
switch (status.state()) {
case TASK_RUNNING:
CHECK(shard.launched <= shard.tasks);
if (shard.launched == shard.tasks &&
shard.state == Shard::STAGING) {
shard.state = Shard::RUNNING;
}
break;
case TASK_FINISHED:
++shard.terminated;
CHECK(shard.terminated <= shard.tasks);
if (shard.terminated == shard.tasks &&
shard.state == Shard::RUNNING) {
shard.state = Shard::TERMINATING;
}
break;
case TASK_STAGING:
case TASK_STARTING:
// Ignore the status update.
break;
default:
LOG(ERROR) << "Unexpected task state " << status.state()
<< " for task '" << status.task_id() << "'";
driver->abort();
break;
}
break;
}
}
}
void frameworkMessage(
SchedulerDriver* driver,
const ExecutorID& executorId,
const SlaveID& slaveId,
const string& data) override
{
LOG(INFO) << "Received framework message from executor '" << executorId
<< "' on agent " << slaveId << ": '" << data << "'";
}
void slaveLost(SchedulerDriver* driver, const SlaveID& slaveId) override
{
LOG(INFO) << "Lost agent " << slaveId;
}
void executorLost(
SchedulerDriver* driver,
const ExecutorID& executorId,
const SlaveID& slaveId,
int status) override
{
LOG(INFO) << "Lost executor '" << executorId << "' on agent "
<< slaveId << ", " << WSTRINGIFY(status);
}
void error(SchedulerDriver* driver, const string& message) override
{
LOG(ERROR) << message;
}
private:
struct Shard
{
// States that the state machine for each shard goes through.
//
// The state machine per shard runs as follows:
// 1. The shard is STAGING when it launches a writer task that writes
// to the persistent volume.
// 2. The shard launches one or more reader tasks that read the same file
// from the persistent volume to confirm the write operation. Once all
// tasks are launched, the shard is RUNNING.
// 3. When all tasks finish, the shard starts TERMINATING by destroying
// the persistent volume.
// 4. Once the persistent volume is successfully destroyed, the shard
// is DONE and terminates.
//
// In the case of regular persistent volumes, the tasks run in sequence;
// whereas in the case of shared persistent volumes, the tasks run in
// parallel.
enum State
{
INIT = 0, // The shard is in initialized state.
STAGING, // The shard is awaiting offers to launch more tasks.
RUNNING, // The shard is running (i.e., all tasks have
// successfully launched).
TERMINATING, // All tasks are finished and needs cleanup.
DONE, // The shard has finished cleaning up.
};
// The persistent volume associated with this shard.
struct Volume
{
explicit Volume(bool _isShared) : isShared(_isShared) {}
// `Resource` object for this volume.
Option<Resource> resource;
// Flag to indicate if this is a regular or shared persistent volume.
bool isShared;
};
Shard(
const string& _name,
const string& role,
size_t _tasks,
bool isShared)
: name(_name),
state(INIT),
volume(isShared),
resources(SHARD_INITIAL_RESOURCES(role)),
launched(0),
terminated(0),
tasks(_tasks) {}
string name;
State state; // The current state of this shard.
hashset<TaskID> taskIds; // The IDs of the tasks running on this shard.
Volume volume; // The persistent volume associated with the shard.
Resources resources; // Resources required to launch the shard.
size_t launched; // How many tasks this shard has launched.
size_t terminated; // How many tasks this shard has terminated.
size_t tasks; // How many tasks this shard should launch.
};
FrameworkInfo frameworkInfo;
const string role;
vector<Shard> shards;
};
class Flags : public virtual mesos::internal::examples::Flags
{
public:
Flags()
{
add(&Flags::num_shards,
"num_shards",
"The number of shards the framework will run using regular volume.",
2);
add(&Flags::num_shared_shards,
"num_shared_shards",
"The number of shards the framework will run using shared volume.",
2);
add(&Flags::tasks_per_shard,
"tasks_per_shard",
"The number of tasks should be launched per shard.",
2);
}
size_t num_shards;
size_t num_shared_shards;
size_t tasks_per_shard;
};
int main(int argc, char** argv)
{
Flags flags;
Try<flags::Warnings> load = flags.load("MESOS_EXAMPLE_", argc, argv);
if (flags.help) {
std::cout << flags.usage() << std::endl;
return EXIT_SUCCESS;
}
if (load.isError()) {
std::cerr << flags.usage(load.error()) << std::endl;
return EXIT_FAILURE;
}
logging::initialize(argv[0], true, flags); // Catch signals.
// Log any flag warnings (after logging is initialized).
foreach (const flags::Warning& warning, load->warnings) {
LOG(WARNING) << warning.message;
}
if (flags.role == "*") {
EXIT(EXIT_FAILURE)
<< "Role is incorrect; the default '*' role cannot be used";
}
FrameworkInfo framework;
framework.set_user(""); // Have Mesos fill in the current user.
framework.set_principal(flags.principal);
framework.set_name(FRAMEWORK_NAME);
framework.add_capabilities()->set_type(
FrameworkInfo::Capability::MULTI_ROLE);
framework.set_checkpoint(flags.checkpoint);
framework.add_roles(flags.role);
framework.add_capabilities()->set_type(
FrameworkInfo::Capability::SHARED_RESOURCES);
framework.add_capabilities()->set_type(
FrameworkInfo::Capability::RESERVATION_REFINEMENT);
if (flags.master == "local") {
// Configure master.
os::setenv("MESOS_ROLES", flags.role);
os::setenv("MESOS_AUTHENTICATE_FRAMEWORKS", stringify(flags.authenticate));
ACLs acls;
ACL::RegisterFramework* acl = acls.add_register_frameworks();
acl->mutable_principals()->set_type(ACL::Entity::ANY);
acl->mutable_roles()->add_values(flags.role);
os::setenv("MESOS_ACLS", stringify(JSON::protobuf(acls)));
// Configure agent.
os::setenv("MESOS_DEFAULT_ROLE", flags.role);
}
MesosSchedulerDriver* driver;
PersistentVolumeScheduler scheduler(
framework,
flags.num_shards,
flags.num_shared_shards,
flags.tasks_per_shard);
if (flags.authenticate) {
LOG(INFO) << "Enabling authentication for the framework";
Credential credential;
credential.set_principal(flags.principal);
if (flags.secret.isSome()) {
credential.set_secret(flags.secret.get());
}
driver = new MesosSchedulerDriver(
&scheduler,
framework,
flags.master,
credential);
} else {
driver = new MesosSchedulerDriver(
&scheduler,
framework,
flags.master);
}
int status = driver->run() == DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE;
driver->stop();
delete driver;
return status;
}