blob: 41e2b435d28c592f6047d166ee1d016236e043e2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <iostream>
#include <string>
#include <vector>
#include <mesos/resources.hpp>
#include <mesos/scheduler.hpp>
#include <mesos/authorizer/acls.hpp>
#include <process/clock.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/http.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
#include <process/time.hpp>
#include <process/metrics/counter.hpp>
#include <process/metrics/pull_gauge.hpp>
#include <process/metrics/metrics.hpp>
#include <stout/bytes.hpp>
#include <stout/flags.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/protobuf.hpp>
#include <stout/stringify.hpp>
#include <stout/os/realpath.hpp>
#include "common/parse.hpp"
#include "examples/flags.hpp"
#include "logging/logging.hpp"
using namespace mesos;
using namespace mesos::internal;
using std::string;
using google::protobuf::RepeatedPtrField;
using process::Clock;
using process::defer;
using process::metrics::PullGauge;
using process::metrics::Counter;
const double CPUS_PER_TASK = 0.1;
const double CPUS_PER_EXECUTOR = 0.1;
const int32_t MEM_PER_EXECUTOR = 64;
constexpr char EXECUTOR_BINARY[] = "balloon-executor";
constexpr char FRAMEWORK_METRICS_PREFIX[] = "balloon_framework";
class Flags : public virtual mesos::internal::examples::Flags
{
public:
Flags()
{
add(&Flags::name,
"name",
"Name to be used by the framework.",
"Balloon Framework");
add(&Flags::task_memory_usage_limit,
"task_memory_usage_limit",
None(),
"Maximum size, in bytes, of the task's memory usage.\n"
"The task will attempt to occupy memory up until this limit.",
static_cast<const Bytes*>(nullptr),
[](const Bytes& value) -> Option<Error> {
if (value < Bytes(MEM_PER_EXECUTOR, Bytes::MEGABYTES)) {
return Error(
"Please use a --task_memory_usage_limit greater than " +
stringify(MEM_PER_EXECUTOR) + " MB");
}
return None();
});
add(&Flags::task_memory,
"task_memory",
"How much memory the framework will require per task.\n"
"If not specified, the task(s) will use all available memory in\n"
"applicable offers.");
add(&Flags::build_dir,
"build_dir",
"The build directory of Mesos. If set, the framework will assume\n"
"that the executor, framework, and agent(s) all live on the same\n"
"machine.");
add(&Flags::executor_uri,
"executor_uri",
"URI the fetcher should use to get the executor's binary.\n"
"NOTE: This flag is deprecated in favor of `--executor_uris`");
add(&Flags::executor_uris,
"executor_uris",
"The value could be a JSON-formatted string of `URI`s that\n"
"should be fetched before running the executor, or a file\n"
"path containing the JSON-formatted `URI`s. Path must be of\n"
"the form `file:///path/to/file` or `/path/to/file`.\n"
"This flag replaces `--executor_uri`.\n"
"See the `CommandInfo::URI` message in `mesos.proto` for the\n"
"expected format.\n"
"Example:\n"
"[\n"
" {\n"
" \"value\":\"mesos.apache.org/balloon_executor\",\n"
" \"executable\":\"true\"\n"
" },\n"
" {\n"
" \"value\":\"mesos.apache.org/bundle_for_executor.tar.gz\",\n"
" \"cache\":\"true\"\n"
" }\n"
"]");
add(&Flags::executor_command,
"executor_command",
"The command that should be used to start the executor.\n"
"This will override the value set by `--build_dir`.");
add(&Flags::long_running,
"long_running",
"Whether this framework should launch tasks repeatedly\n"
"or exit after finishing a single task.",
false);
}
string name;
Bytes task_memory_usage_limit;
Bytes task_memory;
// Flags for specifying the executor binary and other URIs.
//
// TODO(armand): Remove the `--executor_uri` flag after the
// deprecation cycle, started in 1.4.0.
Option<string> build_dir;
Option<string> executor_uri;
Option<JSON::Array> executor_uris;
Option<string> executor_command;
bool long_running;
};
// Actor holding the business logic and metrics for the `BalloonScheduler`.
// See `BalloonScheduler` below for the intended behavior.
class BalloonSchedulerProcess : public process::Process<BalloonSchedulerProcess>
{
public:
BalloonSchedulerProcess(
const FrameworkInfo& _frameworkInfo,
const ExecutorInfo& _executor,
const Flags& _flags)
: frameworkInfo(_frameworkInfo),
role(_frameworkInfo.roles(0)),
executor(_executor),
flags(_flags),
taskActive(false),
tasksLaunched(0),
isRegistered(false),
metrics(*this)
{
start_time = Clock::now();
}
void registered()
{
isRegistered = true;
}
void disconnected()
{
isRegistered = false;
}
void resourceOffers(
SchedulerDriver* driver,
const std::vector<Offer>& offers)
{
Resources taskResources = Resources::parse(
"cpus:" + stringify(CPUS_PER_TASK) +
";mem:" + stringify(
(double) flags.task_memory.bytes() / Bytes::MEGABYTES)).get();
taskResources.allocate(role);
Resources executorResources = Resources(executor.resources());
executorResources.allocate(role);
foreach (const Offer& offer, offers) {
Resources resources(offer.resources());
// If there is an active task, or if the offer is not
// big enough, reject the offer.
if (taskActive ||
!resources.toUnreserved().contains(
taskResources + executorResources)) {
Filters filters;
filters.set_refuse_seconds(600);
driver->declineOffer(offer.id(), filters);
continue;
}
int taskId = tasksLaunched++;
LOG(INFO) << "Launching task " << taskId;
TaskInfo task;
task.set_name(flags.name + " Task");
task.mutable_task_id()->set_value(stringify(taskId));
task.mutable_slave_id()->MergeFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(taskResources);
task.set_data(stringify(flags.task_memory_usage_limit));
task.mutable_executor()->CopyFrom(executor);
task.mutable_executor()->mutable_executor_id()->set_value(
stringify(taskId));
driver->launchTasks(offer.id(), {task});
taskActive = true;
}
}
void statusUpdate(SchedulerDriver* driver, const TaskStatus& status)
{
if (!flags.long_running) {
if (status.state() == TASK_FAILED &&
status.reason() == TaskStatus::REASON_CONTAINER_LIMITATION_MEMORY) {
// NOTE: We expect TASK_FAILED when this scheduler is launched by the
// balloon_framework_test.sh shell script. The abort here ensures the
// script considers the test result as "PASS".
driver->abort();
} else if (status.state() == TASK_FAILED ||
status.state() == TASK_FINISHED ||
status.state() == TASK_KILLED ||
status.state() == TASK_LOST ||
status.state() == TASK_ERROR) {
driver->stop();
}
}
if (stringify(tasksLaunched - 1) != status.task_id().value()) {
// We might receive messages from older tasks. Ignore them.
LOG(INFO) << "Ignoring status update from older task "
<< status.task_id();
return;
}
switch (status.state()) {
case TASK_FINISHED:
taskActive = false;
++metrics.tasks_finished;
break;
case TASK_FAILED:
taskActive = false;
if (status.reason() == TaskStatus::REASON_CONTAINER_LIMITATION_MEMORY) {
++metrics.tasks_oomed;
}
// NOTE: Fetching the executor (e.g. `--executor_uri`) may fail
// occasionally if the URI is rate limited. This case is common
// enough that it makes sense to track this failure metric separately.
if (status.reason() == TaskStatus::REASON_CONTAINER_LAUNCH_FAILED) {
++metrics.launch_failures;
}
break;
case TASK_KILLED:
case TASK_LOST:
case TASK_ERROR:
taskActive = false;
if (status.reason() != TaskStatus::REASON_INVALID_OFFERS) {
++metrics.abnormal_terminations;
}
break;
case TASK_RUNNING:
++metrics.tasks_running;
break;
// We ignore uninteresting transient task status updates.
case TASK_KILLING:
case TASK_STAGING:
case TASK_STARTING:
break;
// We ignore task status updates related to reconciliation.
case TASK_DROPPED:
case TASK_GONE:
case TASK_GONE_BY_OPERATOR:
case TASK_UNKNOWN:
case TASK_UNREACHABLE:
break;
}
}
private:
const FrameworkInfo frameworkInfo;
const string role;
const ExecutorInfo executor;
const Flags flags;
bool taskActive;
int tasksLaunched;
process::Time start_time;
double _uptime_secs()
{
return (Clock::now() - start_time).secs();
}
bool isRegistered;
double _registered()
{
return isRegistered ? 1 : 0;
}
struct Metrics
{
Metrics(const BalloonSchedulerProcess& _scheduler)
: uptime_secs(
string(FRAMEWORK_METRICS_PREFIX) + "/uptime_secs",
defer(_scheduler, &BalloonSchedulerProcess::_uptime_secs)),
registered(
string(FRAMEWORK_METRICS_PREFIX) + "/registered",
defer(_scheduler, &BalloonSchedulerProcess::_registered)),
tasks_finished(string(FRAMEWORK_METRICS_PREFIX) + "/tasks_finished"),
tasks_oomed(string(FRAMEWORK_METRICS_PREFIX) + "/tasks_oomed"),
tasks_running(string(FRAMEWORK_METRICS_PREFIX) + "/tasks_running"),
launch_failures(string(FRAMEWORK_METRICS_PREFIX) + "/launch_failures"),
abnormal_terminations(
string(FRAMEWORK_METRICS_PREFIX) + "/abnormal_terminations")
{
process::metrics::add(uptime_secs);
process::metrics::add(registered);
process::metrics::add(tasks_finished);
process::metrics::add(tasks_oomed);
process::metrics::add(tasks_running);
process::metrics::add(launch_failures);
process::metrics::add(abnormal_terminations);
}
~Metrics()
{
process::metrics::remove(uptime_secs);
process::metrics::remove(registered);
process::metrics::remove(tasks_finished);
process::metrics::remove(tasks_oomed);
process::metrics::remove(launch_failures);
process::metrics::remove(abnormal_terminations);
}
process::metrics::PullGauge uptime_secs;
process::metrics::PullGauge registered;
process::metrics::Counter tasks_finished;
process::metrics::Counter tasks_oomed;
process::metrics::Counter tasks_running;
process::metrics::Counter launch_failures;
process::metrics::Counter abnormal_terminations;
} metrics;
};
// This scheduler starts a single executor and task which gradually
// increases its memory footprint up to a limit. Depending on the
// resource limits set for the container, the framework expects the
// executor to either finish successfully or be OOM-killed.
class BalloonScheduler : public Scheduler
{
public:
BalloonScheduler(
const FrameworkInfo& _frameworkInfo,
const ExecutorInfo& _executor,
const Flags& _flags)
: process(_frameworkInfo, _executor, _flags)
{
process::spawn(process);
}
~BalloonScheduler() override
{
process::terminate(process);
process::wait(process);
}
void registered(
SchedulerDriver*,
const FrameworkID& frameworkId,
const MasterInfo&) override
{
LOG(INFO) << "Registered with framework ID: " << frameworkId;
process::dispatch(&process, &BalloonSchedulerProcess::registered);
}
void reregistered(SchedulerDriver*, const MasterInfo& masterInfo) override
{
LOG(INFO) << "Reregistered";
process::dispatch(&process, &BalloonSchedulerProcess::registered);
}
void disconnected(SchedulerDriver* driver) override
{
LOG(INFO) << "Disconnected";
process::dispatch(&process, &BalloonSchedulerProcess::disconnected);
}
void resourceOffers(
SchedulerDriver* driver,
const std::vector<Offer>& offers) override
{
LOG(INFO) << "Resource offers received";
process::dispatch(
&process,
&BalloonSchedulerProcess::resourceOffers,
driver,
offers);
}
void offerRescinded(SchedulerDriver* driver, const OfferID& offerId) override
{
LOG(INFO) << "Offer rescinded";
}
void statusUpdate(SchedulerDriver* driver, const TaskStatus& status) override
{
LOG(INFO) << "Task " << status.task_id() << " in state "
<< TaskState_Name(status.state())
<< ", Source: " << status.source()
<< ", Reason: " << status.reason()
<< (status.has_message() ? ", Message: " + status.message() : "");
process::dispatch(
&process,
&BalloonSchedulerProcess::statusUpdate,
driver,
status);
}
void frameworkMessage(
SchedulerDriver* driver,
const ExecutorID& executorId,
const SlaveID& slaveId,
const string& data) override
{
LOG(INFO) << "Framework message: " << data;
}
void slaveLost(SchedulerDriver* driver, const SlaveID& slaveId) override
{
LOG(INFO) << "Agent lost: " << slaveId;
}
void executorLost(
SchedulerDriver* driver,
const ExecutorID& executorId,
const SlaveID& slaveId,
int status) override
{
LOG(INFO) << "Executor '" << executorId << "' lost on agent: " << slaveId;
}
void error(SchedulerDriver* driver, const string& message) override
{
LOG(INFO) << "Error message: " << message;
}
private:
BalloonSchedulerProcess process;
};
int main(int argc, char** argv)
{
Flags flags;
Try<flags::Warnings> load = flags.load("MESOS_EXAMPLE_", argc, argv);
if (flags.help) {
std::cout << flags.usage() << std::endl;
return EXIT_SUCCESS;
}
if (load.isError()) {
std::cerr << flags.usage(load.error()) << std::endl;
return EXIT_FAILURE;
}
logging::initialize(argv[0], false);
// Log any flag warnings.
foreach (const flags::Warning& warning, load->warnings) {
LOG(WARNING) << warning.message;
}
const Resources resources = Resources::parse(
"cpus:" + stringify(CPUS_PER_EXECUTOR) +
";mem:" + stringify(MEM_PER_EXECUTOR)).get();
ExecutorInfo executor;
executor.mutable_resources()->CopyFrom(resources);
executor.set_name(flags.name + " Executor");
// Determine the command to run the executor based on three possibilities:
// 1) `--executor_command` was set, which overrides the below cases.
// 2) We are in the Mesos build directory, so the targeted executable
// is actually a libtool wrapper script.
// 3) We have not detected the Mesos build directory, so assume the
// executor is in the same directory as the framework.
string command;
// Find this executable's directory to locate executor.
if (flags.executor_command.isSome()) {
command = flags.executor_command.get();
} else if (flags.build_dir.isSome()) {
command = path::join(flags.build_dir.get(), "src", EXECUTOR_BINARY);
} else {
command =
path::join(os::realpath(Path(argv[0]).dirname()).get(), EXECUTOR_BINARY);
}
executor.mutable_command()->set_value(command);
if (flags.executor_uris.isSome() && flags.executor_uri.isSome()) {
EXIT(EXIT_FAILURE)
<< "Flag '--executor_uris' shall not be used with '--executor_uri'";
}
// Copy `--executor_uri` into the command.
if (flags.executor_uri.isSome()) {
LOG(WARNING)
<< "Flag '--executor_uri' is deprecated, use '--executor_uris' instead";
mesos::CommandInfo::URI* uri = executor.mutable_command()->add_uris();
uri->set_value(flags.executor_uri.get());
uri->set_executable(true);
}
// Copy `--executor_uris` into the command.
if (flags.executor_uris.isSome()) {
Try<RepeatedPtrField<mesos::CommandInfo::URI>> parse =
::protobuf::parse<RepeatedPtrField<mesos::CommandInfo::URI>>(
flags.executor_uris.get());
if (parse.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to convert '--executor_uris' to protobuf: " << parse.error();
}
executor.mutable_command()->mutable_uris()->CopyFrom(parse.get());
}
FrameworkInfo framework;
framework.set_user(os::user().get());
framework.set_principal(flags.principal);
framework.set_name(flags.name);
framework.set_checkpoint(flags.checkpoint);
framework.add_roles(flags.role);
framework.add_capabilities()->set_type(
FrameworkInfo::Capability::MULTI_ROLE);
framework.add_capabilities()->set_type(
FrameworkInfo::Capability::RESERVATION_REFINEMENT);
BalloonScheduler scheduler(framework, executor, flags);
if (flags.master == "local") {
// Configure master.
os::setenv("MESOS_ROLES", flags.role);
os::setenv("MESOS_AUTHENTICATE_FRAMEWORKS", stringify(flags.authenticate));
ACLs acls;
ACL::RegisterFramework* acl = acls.add_register_frameworks();
acl->mutable_principals()->set_type(ACL::Entity::ANY);
acl->mutable_roles()->add_values(flags.role);
os::setenv("MESOS_ACLS", stringify(JSON::protobuf(acls)));
}
MesosSchedulerDriver* driver;
if (flags.authenticate) {
LOG(INFO) << "Enabling authentication for the framework";
Credential credential;
credential.set_principal(flags.principal);
if (flags.secret.isSome()) {
credential.set_secret(flags.secret.get());
}
driver = new MesosSchedulerDriver(
&scheduler,
framework,
flags.master,
credential);
} else {
driver = new MesosSchedulerDriver(
&scheduler,
framework,
flags.master);
}
int status = driver->run() == DRIVER_STOPPED ? 0 : 1;
// Ensure that the driver process terminates.
driver->stop();
delete driver;
return status;
}