blob: 360d6c6dc660aedb0c00d1717c34a2a6170761bc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <iostream>
#include <queue>
#include <string>
#include <vector>
#include <mesos/authorizer/acls.hpp>
#include <mesos/v1/mesos.hpp>
#include <mesos/v1/resources.hpp>
#include <mesos/v1/scheduler.hpp>
#include <process/clock.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/http.hpp>
#include <process/owned.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
#include <process/time.hpp>
#include <process/metrics/counter.hpp>
#include <process/metrics/metrics.hpp>
#include <process/metrics/pull_gauge.hpp>
#include <stout/check.hpp>
#include <stout/exit.hpp>
#include <stout/flags.hpp>
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
#include <stout/json.hpp>
#include <stout/lambda.hpp>
#include <stout/none.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/protobuf.hpp>
#include <stout/stringify.hpp>
#include "examples/flags.hpp"
#include "logging/logging.hpp"
using namespace mesos::v1;
using mesos::v1::scheduler::Call;
using mesos::v1::scheduler::Event;
using process::Clock;
using process::defer;
using process::http::OK;
using process::metrics::Counter;
using process::metrics::PullGauge;
const float CPUS_PER_TASK = 0.2;
const int32_t MEM_PER_TASK = 32;
constexpr char FRAMEWORK_NAME[] = "Inverse Offer Framework (C++)";
constexpr char FRAMEWORK_METRICS_PREFIX[] = "inverse_offer_framework";
// Holds a sleep task and when the task's machine is scheduled for maintenance.
struct SleeperInfo
{
TaskID taskId;
TimeInfo unavailability;
};
// This scheduler launches and maintains a configurable number of
// infinite-sleep tasks, placing at most one task on a single agent.
// When the operator schedules maintenance on the cluster, the scheduler
// will respond by migrating sleep tasks ahead of the planned maintenance.
class InverseOfferScheduler : public process::Process<InverseOfferScheduler>
{
public:
InverseOfferScheduler(
const FrameworkInfo& _framework,
const std::string& _master,
const uint32_t _num_tasks,
const Option<Credential>& _credential)
: framework(_framework),
master(_master),
num_tasks(_num_tasks),
credential(_credential),
tasks_launched(0),
state(DISCONNECTED),
metrics(*this)
{
start_time = Clock::now();
}
~InverseOfferScheduler() override {}
protected:
void initialize() override
{
// We initialize the library here to ensure that callbacks are only invoked
// after the process has spawned.
mesos.reset(new scheduler::Mesos(
master,
mesos::ContentType::PROTOBUF,
process::defer(self(), &Self::connected),
process::defer(self(), &Self::disconnected),
process::defer(self(), &Self::received, lambda::_1),
credential));
}
void connected()
{
state = CONNECTED;
doReliableRegistration();
}
void disconnected()
{
LOG(INFO) << "Disconnected!";
state = DISCONNECTED;
}
void doReliableRegistration()
{
if (state == SUBSCRIBED || state == DISCONNECTED) {
return;
}
Call call;
call.set_type(Call::SUBSCRIBE);
if (framework.has_id()) {
call.mutable_framework_id()->CopyFrom(framework.id());
}
Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(framework);
mesos->send(call);
process::delay(Seconds(1), self(), &Self::doReliableRegistration);
}
void received(std::queue<Event> events)
{
while (!events.empty()) {
Event event = events.front();
events.pop();
LOG(INFO) << "Received " << event.type() << " event";
switch (event.type()) {
case Event::SUBSCRIBED: {
framework.mutable_id()->CopyFrom(event.subscribed().framework_id());
LOG(INFO) << "Subscribed with ID '" << framework.id();
state = SUBSCRIBED;
break;
}
case Event::OFFERS: {
metrics.offers_received += event.offers().offers().size();
resourceOffers(google::protobuf::convert(event.offers().offers()));
break;
}
case Event::INVERSE_OFFERS: {
metrics.inverse_offers_received +=
event.inverse_offers().inverse_offers().size();
inverseOffers(google::protobuf::convert(
event.inverse_offers().inverse_offers()));
break;
}
case Event::UPDATE: {
statusUpdate(event.update().status());
break;
}
// TODO(greggomann): Implement handling of operation status updates.
case Event::UPDATE_OPERATION_STATUS:
break;
case Event::FAILURE: {
const Event::Failure& failure = event.failure();
if (failure.has_agent_id() && failure.has_executor_id()) {
LOG(INFO)
<< "Executor '" << failure.executor_id()
<< "' lost on agent '" << failure.agent_id()
<< (failure.has_status() ?
"' with status: " + stringify(failure.status()) : "");
} else {
CHECK(failure.has_agent_id());
LOG(INFO) << "Agent lost: " << failure.agent_id();
}
break;
}
case Event::ERROR: {
EXIT(EXIT_FAILURE) << "Error: " << event.error().message();
break;
}
case Event::HEARTBEAT:
case Event::RESCIND:
case Event::RESCIND_INVERSE_OFFER:
case Event::MESSAGE: {
break;
}
case Event::UNKNOWN: {
LOG(WARNING) << "Received an UNKNOWN event and ignored";
break;
}
}
}
}
private:
void resourceOffers(const std::vector<Offer>& offers)
{
CHECK(framework.has_id());
// Of existing sleep tasks, identify the one running on an agent
// with the least expected uptime (i.e. next to be maintained).
// We'll see if we can migrate this sleep task.
Option<AgentID> riskiestAgent;
foreachpair (const AgentID& agentId, const SleeperInfo& sleeper, sleepers) {
if (riskiestAgent.isSome()) {
if (sleeper.unavailability.nanoseconds() <
sleepers[riskiestAgent.get()].unavailability.nanoseconds()) {
riskiestAgent = agentId;
}
} else if (sleeper.unavailability.nanoseconds() > 0) {
riskiestAgent = agentId;
}
}
foreach (const Offer& offer, offers) {
const Resources taskResources = [this]() {
Resources resources = Resources::parse(
"cpus:" + stringify(CPUS_PER_TASK) +
";mem:" + stringify(MEM_PER_TASK)).get();
resources.allocate(framework.role());
return resources;
}();
// Are there already `num_task` sleep tasks running?
// Having `num_task` sleeps running takes priority over dealing
// with maintenance.
bool needMoreSleep = sleepers.size() < num_tasks;
// Is the agent in the offer less risky than our riskiest agent?
// i.e. The offered agent's planned downtime is farther away.
bool offeredAgentIsLessRisky = riskiestAgent.isSome() &&
(!offer.has_unavailability() ||
offer.unavailability().start().nanoseconds() >
sleepers[riskiestAgent.get()].unavailability.nanoseconds());
// Are we already running a task on this agent?
// This scheduler will only launch one task per agent.
bool offeredAgentIsOccupied = sleepers.contains(offer.agent_id());
// We only need to accept an offer if we do not have enough sleep
// tasks active, or the offer provides a better agent.
bool needToLaunchTask = !offeredAgentIsOccupied &&
(needMoreSleep || offeredAgentIsLessRisky);
Resources resources(offer.resources());
// Check if this offer is big enough and if we need to launch anything.
if (!resources.toUnreserved().contains(taskResources) ||
!needToLaunchTask) {
Call call;
call.mutable_framework_id()->CopyFrom(framework.id());
call.set_type(Call::DECLINE);
Call::Decline* decline = call.mutable_decline();
decline->add_offer_ids()->CopyFrom(offer.id());
decline->mutable_filters()->set_refuse_seconds(600);
mesos->send(call);
continue;
}
// Keeping `num_tasks` running has higher priority than migrating tasks.
// We only migrate tasks if there are enough running tasks.
if (!needMoreSleep && offeredAgentIsLessRisky) {
LOG(INFO) << "Migrating task " << sleepers[riskiestAgent.get()].taskId
<< " from " << riskiestAgent.get();
Call call;
call.mutable_framework_id()->CopyFrom(framework.id());
call.set_type(Call::KILL);
Call::Kill* kill = call.mutable_kill();
kill->mutable_task_id()->CopyFrom(sleepers[riskiestAgent.get()].taskId);
kill->mutable_agent_id()->CopyFrom(riskiestAgent.get());
mesos->send(call);
// Keep track of this sleeper in another map.
migrating[riskiestAgent.get()] = sleepers[riskiestAgent.get()];
sleepers.erase(riskiestAgent.get());
// For simplicity, we only migrate one task per round of offers.
// Setting the `riskiestAgent` means we will no longer consider
// migrating tasks in this loop.
riskiestAgent = None();
// Since we killed a task, we need to start another one.
needMoreSleep = true;
}
if (needMoreSleep) {
LOG(INFO) << "Starting task " << tasks_launched
<< " on " << offer.agent_id();
TaskInfo task;
task.mutable_task_id()->set_value(stringify(tasks_launched));
task.set_name("Sleeper Agent " + stringify(tasks_launched++));
task.mutable_agent_id()->MergeFrom(offer.agent_id());
task.mutable_resources()->CopyFrom(taskResources);
task.mutable_command()->set_value(
"while [ true ]; do echo ZZZzzz...; sleep 5; done");
Call call;
call.mutable_framework_id()->CopyFrom(framework.id());
call.set_type(Call::ACCEPT);
Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
Offer::Operation* operation = accept->add_operations();
operation->set_type(Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(task);
mesos->send(call);
// Save the new sleep task.
SleeperInfo sleeper;
sleeper.taskId = task.task_id();
if (offer.has_unavailability()) {
sleeper.unavailability = offer.unavailability().start();
}
sleepers[offer.agent_id()] = sleeper;
}
}
}
void inverseOffers(const std::vector<InverseOffer>& offers)
{
foreach (const InverseOffer& offer, offers) {
if (!sleepers.contains(offer.agent_id())) {
LOG(INFO) << "Inverse offer received for " << offer.agent_id()
<< " which does not hold an active sleep task.";
continue;
}
// Take note of any agents that are scheduled for maintenance.
sleepers[offer.agent_id()].unavailability =
offer.unavailability().start();
// TODO(josephw): Demonstrate some semantics for declining inverse
// offers. This framework currently always accepts inverse offers.
Call call;
CHECK(framework.has_id());
call.mutable_framework_id()->CopyFrom(framework.id());
call.set_type(Call::ACCEPT_INVERSE_OFFERS);
Call::AcceptInverseOffers* accept = call.mutable_accept_inverse_offers();
accept->add_inverse_offer_ids()->CopyFrom(offer.id());
mesos->send(call);
}
}
void statusUpdate(const TaskStatus& status)
{
LOG(INFO)
<< "Task " << status.task_id().value()
<< " is in state " << TaskState_Name(status.state())
<< (status.has_message() ? " with message: " + status.message() : "");
if (status.has_uuid()) {
Call call;
call.set_type(Call::ACKNOWLEDGE);
CHECK(framework.has_id());
call.mutable_framework_id()->CopyFrom(framework.id());
Call::Acknowledge* acknowledge = call.mutable_acknowledge();
acknowledge->mutable_agent_id()->CopyFrom(status.agent_id());
acknowledge->mutable_task_id()->CopyFrom(status.task_id());
acknowledge->set_uuid(status.uuid());
mesos->send(call);
}
// This is the only expected terminal state.
if (migrating.contains(status.agent_id()) &&
status.state() == TASK_KILLED) {
++metrics.sleepers_killed;
migrating.erase(status.agent_id());
return;
}
// These are un-expected terminal states.
if (status.state() == TASK_FINISHED ||
status.state() == TASK_LOST ||
status.state() == TASK_FAILED ||
status.state() == TASK_ERROR ||
status.state() == TASK_KILLED) {
++metrics.sleepers_lost_abnormally;
sleepers.erase(status.agent_id());
migrating.erase(status.agent_id());
}
}
void finalize() override
{
if (state == SUBSCRIBED) {
Call call;
CHECK(framework.has_id());
call.mutable_framework_id()->CopyFrom(framework.id());
call.set_type(Call::TEARDOWN);
mesos->send(call);
}
}
FrameworkInfo framework;
const std::string master;
const uint32_t num_tasks;
const Option<Credential> credential;
// Agents which currently hold a sleep task.
hashmap<AgentID, SleeperInfo> sleepers;
hashmap<AgentID, SleeperInfo> migrating;
int tasks_launched;
process::Owned<scheduler::Mesos> mesos;
enum State
{
DISCONNECTED,
CONNECTED,
SUBSCRIBED
} state;
process::Time start_time;
double _uptime_secs()
{
return (Clock::now() - start_time).secs();
}
double _subscribed()
{
return state == SUBSCRIBED ? 1 : 0;
}
double _current_sleepers()
{
return sleepers.size() + migrating.size();
}
struct Metrics
{
Metrics(const InverseOfferScheduler& scheduler)
: uptime_secs(
std::string(FRAMEWORK_METRICS_PREFIX) + "/uptime_secs",
defer(scheduler, &InverseOfferScheduler::_uptime_secs)),
subscribed(
std::string(FRAMEWORK_METRICS_PREFIX) + "/subscribed",
defer(scheduler, &InverseOfferScheduler::_subscribed)),
offers_received(
std::string(FRAMEWORK_METRICS_PREFIX) + "/offers_received"),
inverse_offers_received(
std::string(FRAMEWORK_METRICS_PREFIX) + "/inverse_offers_received"),
sleepers_killed(
std::string(FRAMEWORK_METRICS_PREFIX) + "/sleepers_killed"),
sleepers_lost_abnormally(
std::string(FRAMEWORK_METRICS_PREFIX) + "/sleepers_lost_abnormally"),
current_sleepers(
std::string(FRAMEWORK_METRICS_PREFIX) + "/current_sleepers",
defer(scheduler, &InverseOfferScheduler::_current_sleepers))
{
process::metrics::add(uptime_secs);
process::metrics::add(subscribed);
process::metrics::add(offers_received);
process::metrics::add(inverse_offers_received);
process::metrics::add(sleepers_killed);
process::metrics::add(sleepers_lost_abnormally);
process::metrics::add(current_sleepers);
}
~Metrics()
{
process::metrics::remove(uptime_secs);
process::metrics::remove(subscribed);
process::metrics::remove(offers_received);
process::metrics::remove(inverse_offers_received);
process::metrics::remove(sleepers_killed);
process::metrics::remove(sleepers_lost_abnormally);
process::metrics::remove(current_sleepers);
}
process::metrics::PullGauge uptime_secs;
process::metrics::PullGauge subscribed;
process::metrics::Counter offers_received;
process::metrics::Counter inverse_offers_received;
// The only expected terminal state is TASK_KILLED.
// Other terminal states are considered incorrect.
process::metrics::Counter sleepers_killed;
process::metrics::Counter sleepers_lost_abnormally;
process::metrics::PullGauge current_sleepers;
} metrics;
};
class Flags : public virtual mesos::internal::examples::Flags
{
public:
Flags()
{
add(&Flags::num_tasks,
"num_tasks",
"Number of sleep tasks to run at once. Each task is started on\n"
"a separate machine. The scheduler will attempt to migrate tasks\n"
"to other machines ahead of planned maintenance.",
1,
[](int value) -> Option<Error> {
if (value <= 0) {
return Error("Expected --num_tasks greater than zero");
}
return None();
});
}
int num_tasks;
};
int main(int argc, char** argv)
{
Flags flags;
Try<flags::Warnings> load = flags.load("MESOS_EXAMPLE_", argc, argv);
if (flags.help) {
std::cout << flags.usage() << std::endl;
return EXIT_SUCCESS;
}
if (load.isError()) {
std::cerr << flags.usage(load.error()) << std::endl;
return EXIT_FAILURE;
}
mesos::internal::logging::initialize(argv[0], true, flags); // Catch signals.
// Log any flag warnings.
foreach (const flags::Warning& warning, load->warnings) {
LOG(WARNING) << warning.message;
}
// Nothing special to say about this framework.
FrameworkInfo framework;
framework.set_user(os::user().get());
framework.set_name(FRAMEWORK_NAME);
framework.set_role(flags.role);
framework.set_checkpoint(flags.checkpoint);
framework.add_capabilities()->set_type(
FrameworkInfo::Capability::RESERVATION_REFINEMENT);
Option<Credential> credential = None();
if (flags.authenticate) {
LOG(INFO) << "Enabling authentication for the framework";
Credential credential_;
credential_.set_principal(flags.principal);
if (flags.secret.isSome()) {
credential_.set_secret(flags.secret.get());
}
credential = credential_;
}
if (flags.master == "local") {
// Configure master.
os::setenv("MESOS_ROLES", flags.role);
os::setenv(
"MESOS_AUTHENTICATE_HTTP_FRAMEWORKS",
stringify(flags.authenticate));
os::setenv("MESOS_HTTP_FRAMEWORK_AUTHENTICATORS", "basic");
mesos::ACLs acls;
mesos::ACL::RegisterFramework* acl = acls.add_register_frameworks();
acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
acl->mutable_roles()->add_values(flags.role);
os::setenv("MESOS_ACLS", stringify(JSON::protobuf(acls)));
}
process::Owned<InverseOfferScheduler> scheduler(new InverseOfferScheduler(
framework,
flags.master,
flags.num_tasks,
credential));
process::spawn(scheduler.get());
process::wait(scheduler.get());
return EXIT_SUCCESS;
}