blob: 5e87583a9dd1f70d6eb467ba6f430009914017cc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <iostream>
#include <queue>
#include <string>
#include <mesos/authorizer/acls.hpp>
#include <mesos/v1/mesos.hpp>
#include <mesos/v1/resources.hpp>
#include <mesos/v1/scheduler.hpp>
#include <process/delay.hpp>
#include <process/owned.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/exit.hpp>
#include <stout/flags.hpp>
#include <stout/foreach.hpp>
#include <stout/lambda.hpp>
#include <stout/none.hpp>
#include <stout/numify.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/protobuf.hpp>
#include <stout/uuid.hpp>
#include "common/status_utils.hpp"
#include "examples/flags.hpp"
#include "logging/flags.hpp"
#include "logging/logging.hpp"
using namespace mesos::v1;
using std::cerr;
using std::cout;
using std::endl;
using std::queue;
using std::string;
using std::vector;
using mesos::v1::scheduler::Call;
using mesos::v1::scheduler::Event;
constexpr Seconds REFUSE_TIME = Seconds::max();
constexpr char FRAMEWORK_NAME[] =
"CSI User Event Call Scheduler using libprocess (C++)";
/**
* Example framework performing CSI-related operations.
*
* This class implements a framework communicating with Mesos over the v1 HTTP
* API which transform certain disk resources using CSI operations.
*
* It expects to be offered disk resources from resource providers reserved for
* its role (which likely should not be shared by any other framework in the
* cluster). It will then convert `RAW` disk resources into `MOUNT` volumes and
* unreserve these `MOUNT` resources when they are offered again so that
* frameworks in other roles can consume the created disk resource.
*
* The bulk of the framework logic is implemented in `resourceOffers`.
*/
class HTTPScheduler : public process::Process<HTTPScheduler>
{
public:
HTTPScheduler(
const FrameworkInfo& _framework,
const string& _master,
const Option<Credential>& _credential)
: framework(_framework),
master(_master),
credential(_credential),
state(INITIALIZING) {}
~HTTPScheduler() override {}
void connected()
{
doReliableRegistration();
}
void disconnected()
{
state = DISCONNECTED;
}
void received(queue<Event> events)
{
while (!events.empty()) {
Event event = events.front();
events.pop();
switch (event.type()) {
case Event::SUBSCRIBED: {
cout << endl << "Received a SUBSCRIBED event" << endl;
framework.mutable_id()->CopyFrom(event.subscribed().framework_id());
state = SUBSCRIBED;
cout << "Subscribed with ID " << framework.id() << endl;
break;
}
case Event::OFFERS: {
cout << endl << "Received an OFFERS event" << endl;
resourceOffers(google::protobuf::convert(event.offers().offers()));
break;
}
case Event::INVERSE_OFFERS: {
cout << endl << "Received an INVERSE_OFFERS event" << endl;
break;
}
case Event::RESCIND: {
cout << endl << "Received a RESCIND event" << endl;
break;
}
case Event::RESCIND_INVERSE_OFFER: {
cout << endl << "Received a RESCIND_INVERSE_OFFER event" << endl;
break;
}
case Event::UPDATE: {
cout << endl << "Received an UPDATE event" << endl;
cout << "Unexpected UPDATE event: " << event.update().DebugString()
<< endl;
process::terminate(self());
break;
}
// TODO(greggomann): Implement handling of offer operation updates.
case Event::UPDATE_OPERATION_STATUS:
break;
case Event::MESSAGE: {
cout << endl << "Received a MESSAGE event" << endl;
break;
}
case Event::FAILURE: {
cout << endl << "Received a FAILURE event" << endl;
if (event.failure().has_agent_id()) {
// Agent failed.
cout << "Agent '" << event.failure().agent_id().value()
<< "' terminated" << endl;
} else {
cout << "Unexpected FAILURE event: "
<< event.failure().DebugString() << endl;
process::terminate(self());
}
break;
}
case Event::ERROR: {
cout << endl << "Received an ERROR event: "
<< event.error().message() << endl;
process::terminate(self());
break;
}
case Event::HEARTBEAT: {
cout << endl << "Received a HEARTBEAT event" << endl;
break;
}
case Event::UNKNOWN: {
LOG(WARNING) << "Received an UNKNOWN event and ignored";
break;
}
}
}
}
protected:
void initialize() override
{
// We initialize the library here to ensure that callbacks are only invoked
// after the process has spawned.
mesos.reset(
new scheduler::Mesos(
master,
mesos::ContentType::PROTOBUF,
process::defer(self(), &Self::connected),
process::defer(self(), &Self::disconnected),
process::defer(self(), &Self::received, lambda::_1),
credential));
}
private:
void resourceOffers(const vector<Offer>& offers)
{
foreach (const Offer& offer, offers) {
cout << "Received offer " << offer.id() << " with "
<< Resources(offer.resources()) << endl;
// Filter resources and group interesting resources by resource provider.
//
// NOTE: We introduce a typedef here so we can use this type
// with the `foreachvalue` preprocessor macro below.
using ResourcesByType =
hashmap<Resource::DiskInfo::Source::Type, Resources>;
hashmap<ResourceProviderID, ResourcesByType> resourcesByProvider;
constexpr Resource::DiskInfo::Source::Type RAW =
Resource::DiskInfo::Source::RAW;
constexpr Resource::DiskInfo::Source::Type MOUNT =
Resource::DiskInfo::Source::MOUNT;
foreach(const Resource& resource, offer.resources()) {
// Ignore any resources not from a resource provider.
if (!resource.has_provider_id()) {
continue;
}
// We only convert reserved resources.
if (!Resources::isReserved(resource)) {
continue;
}
// We either work on `RAW` or `MOUNT` disk resources. Store
// them by type and ignore any other resources.
if (Resources::isDisk(resource, RAW)) {
resourcesByProvider[resource.provider_id()][RAW] += resource;
} else if (Resources::isDisk(resource, MOUNT)) {
resourcesByProvider[resource.provider_id()][MOUNT] += resource;
}
}
// Create operations.
// Create a single call which is either an accept and contains
// at least one operation, or a decline.
Call call;
CHECK(framework.has_id());
call.mutable_framework_id()->CopyFrom(framework.id());
// We create operations on resources from each resource provider
// separately as most operations currently cannot operate on multiple
// resource providers at once (`LAUNCH` being the obvious exception).
foreachvalue (
const ResourcesByType& resourcesByType,
resourcesByProvider) {
// Iterate over disk resources grouped by disk type since the
// performed operation depends on the type.
foreachpair (
const Resource::DiskInfo::Source::Type& type,
const Resources& resources,
resourcesByType) {
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
if (accept->offer_ids().empty()) {
accept->add_offer_ids()->CopyFrom(offer.id());
}
if (type == RAW) {
// We create `MOUNT` volumes out of `RAW` disk resources.
foreach (const Resource& resource, resources) {
cout << "Converting 'RAW' disk to 'MOUNT' disk" << endl;
Offer::Operation* operation = accept->add_operations();
operation->set_type(Offer::Operation::CREATE_DISK);
Offer::Operation::CreateDisk* createDisk =
operation->mutable_create_disk();
createDisk->mutable_source()->CopyFrom(resource);
createDisk->set_target_type(
Resource::DiskInfo::Source::MOUNT);
}
} else if (type == MOUNT) {
// We unreserve `MOUNT` disk resources so they can be
// consumed by frameworks in other roles.
foreach (const Resource& resource, resources) {
cout << "Unreserving 'MOUNT' disk" << endl;
Offer::Operation* operation = accept->add_operations();
operation->set_type(Offer::Operation::UNRESERVE);
Offer::Operation::Unreserve* unreserve =
operation->mutable_unreserve();
unreserve->add_resources()->CopyFrom(resource);
}
}
}
}
// If we did not create operations to accept the offer with decline it.
if (!call.has_accept() || call.accept().operations().empty()) {
cout << "Declining offer" << endl;
call.clear_accept();
call.set_type(Call::DECLINE);
Call::Decline* decline = call.mutable_decline();
decline->add_offer_ids()->CopyFrom(offer.id());
decline->mutable_filters()->set_refuse_seconds(REFUSE_TIME.secs());
}
mesos->send(call);
}
}
void doReliableRegistration()
{
if (state == SUBSCRIBED) {
return;
}
Call call;
if (framework.has_id()) {
call.mutable_framework_id()->CopyFrom(framework.id());
}
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(framework);
mesos->send(call);
process::delay(Seconds(1), self(), &Self::doReliableRegistration);
}
FrameworkInfo framework;
string master;
Option<Credential> credential;
process::Owned<scheduler::Mesos> mesos;
enum State
{
INITIALIZING = 0,
SUBSCRIBED = 1,
DISCONNECTED = 2
} state;
};
class Flags : public virtual mesos::internal::examples::Flags,
public virtual mesos::internal::logging::Flags
{};
int main(int argc, char** argv)
{
Flags flags;
Try<flags::Warnings> load = flags.load("MESOS_EXAMPLE_", argc, argv);
if (flags.help) {
cout << flags.usage() << endl;
return EXIT_SUCCESS;
}
if (load.isError()) {
cerr << flags.usage(load.error()) << endl;
return EXIT_FAILURE;
}
mesos::internal::logging::initialize(argv[0], true, flags); // Catch signals.
// Log any flag warnings.
foreach (const flags::Warning& warning, load->warnings) {
LOG(WARNING) << warning.message;
}
FrameworkInfo framework;
framework.set_principal(flags.principal);
framework.set_name(FRAMEWORK_NAME);
framework.set_checkpoint(flags.checkpoint);
framework.add_roles(flags.role);
framework.add_capabilities()->set_type(
FrameworkInfo::Capability::MULTI_ROLE);
framework.add_capabilities()->set_type(
FrameworkInfo::Capability::RESERVATION_REFINEMENT);
const Result<string> user = os::user();
CHECK_SOME(user);
framework.set_user(user.get());
Option<Credential> credential = None();
if (flags.authenticate) {
LOG(INFO) << "Enabling authentication for the framework";
Credential credential_;
credential_.set_principal(flags.principal);
if (flags.secret.isSome()) {
credential_.set_secret(flags.secret.get());
}
credential = credential_;
}
if (flags.master == "local") {
// Configure master.
os::setenv(
"MESOS_AUTHENTICATE_HTTP_FRAMEWORKS", stringify(flags.authenticate));
os::setenv("MESOS_HTTP_FRAMEWORK_AUTHENTICATORS", "basic");
mesos::ACLs acls;
mesos::ACL::RegisterFramework* acl = acls.add_register_frameworks();
acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
acl->mutable_roles()->add_values("*");
os::setenv("MESOS_ACLS", stringify(JSON::protobuf(acls)));
}
process::Owned<HTTPScheduler> scheduler(
new HTTPScheduler(framework, flags.master, credential));
process::spawn(scheduler.get());
process::wait(scheduler.get());
return EXIT_SUCCESS;
}