blob: d6f74c2d812f22ad6914f6cd31891d5e6bccd50b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <iostream>
#include <string>
#include <boost/lexical_cast.hpp>
#include <mesos/resources.hpp>
#include <mesos/scheduler.hpp>
#include <mesos/type_utils.hpp>
#include <mesos/authorizer/acls.hpp>
#include <stout/check.hpp>
#include <stout/exit.hpp>
#include <stout/flags.hpp>
#include <stout/numify.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/protobuf.hpp>
#include <stout/stringify.hpp>
#include <stout/os/realpath.hpp>
#include "logging/flags.hpp"
#include "logging/logging.hpp"
#include "examples/flags.hpp"
using namespace mesos;
using boost::lexical_cast;
using std::cerr;
using std::cout;
using std::endl;
using std::flush;
using std::string;
using std::vector;
using mesos::Resources;
const int32_t CPUS_PER_TASK = 1;
const int32_t MEM_PER_TASK = 128;
constexpr char EXECUTOR_BINARY[] = "test-executor";
constexpr char EXECUTOR_NAME[] = "Test Executor (C++)";
constexpr char FRAMEWORK_NAME[] = "Test Framework (C++)";
class TestScheduler : public Scheduler
{
public:
TestScheduler(
bool _implicitAcknowledgements,
const ExecutorInfo& _executor,
const string& _role)
: implicitAcknowledgements(_implicitAcknowledgements),
executor(_executor),
role(_role),
tasksLaunched(0),
tasksFinished(0),
totalTasks(5) {}
~TestScheduler() override {}
void registered(SchedulerDriver*,
const FrameworkID&,
const MasterInfo&) override
{
cout << "Registered!" << endl;
}
void reregistered(SchedulerDriver*, const MasterInfo& masterInfo) override {}
void disconnected(SchedulerDriver* driver) override {}
void resourceOffers(SchedulerDriver* driver,
const vector<Offer>& offers) override
{
foreach (const Offer& offer, offers) {
cout << "Received offer " << offer.id() << " with " << offer.resources()
<< endl;
Resources taskResources = Resources::parse(
"cpus:" + stringify(CPUS_PER_TASK) +
";mem:" + stringify(MEM_PER_TASK)).get();
taskResources.allocate(role);
Resources remaining = offer.resources();
// Launch tasks.
vector<TaskInfo> tasks;
while (tasksLaunched < totalTasks &&
remaining.toUnreserved().contains(taskResources)) {
int taskId = tasksLaunched++;
cout << "Launching task " << taskId << " using offer "
<< offer.id() << endl;
TaskInfo task;
task.set_name("Task " + lexical_cast<string>(taskId));
task.mutable_task_id()->set_value(lexical_cast<string>(taskId));
task.mutable_slave_id()->MergeFrom(offer.slave_id());
task.mutable_executor()->MergeFrom(executor);
Option<Resources> resources = [&]() {
if (role == "*") {
return remaining.find(taskResources);
}
Resource::ReservationInfo reservation;
reservation.set_type(Resource::ReservationInfo::STATIC);
reservation.set_role(role);
return remaining.find(taskResources.pushReservation(reservation));
}();
CHECK_SOME(resources);
task.mutable_resources()->MergeFrom(resources.get());
remaining -= resources.get();
tasks.push_back(task);
}
driver->launchTasks(offer.id(), tasks);
}
}
void offerRescinded(SchedulerDriver* driver, const OfferID& offerId) override
{}
void statusUpdate(SchedulerDriver* driver, const TaskStatus& status) override
{
int taskId = lexical_cast<int>(status.task_id().value());
cout << "Task " << taskId << " is in state " << status.state() << endl;
if (status.state() == TASK_FINISHED) {
tasksFinished++;
}
if (status.state() == TASK_LOST ||
status.state() == TASK_KILLED ||
status.state() == TASK_FAILED) {
cout << "Aborting because task " << taskId
<< " is in unexpected state " << status.state()
<< " with reason " << status.reason()
<< " from source " << status.source()
<< " with message '" << status.message() << "'" << endl;
driver->abort();
}
if (!implicitAcknowledgements) {
driver->acknowledgeStatusUpdate(status);
}
if (tasksFinished == totalTasks) {
driver->stop();
}
}
void frameworkMessage(
SchedulerDriver* driver,
const ExecutorID& executorId,
const SlaveID& slaveId,
const string& data) override
{}
void slaveLost(SchedulerDriver* driver, const SlaveID& sid) override {}
void executorLost(
SchedulerDriver* driver,
const ExecutorID& executorID,
const SlaveID& slaveID,
int status) override
{}
void error(SchedulerDriver* driver, const string& message) override
{
cout << message << endl;
}
private:
const bool implicitAcknowledgements;
const ExecutorInfo executor;
string role;
int tasksLaunched;
int tasksFinished;
int totalTasks;
};
void usage(const char* argv0, const flags::FlagsBase& flags)
{
cerr << "Usage: " << Path(argv0).basename() << " [...]" << endl
<< endl
<< "Supported options:" << endl
<< flags.usage();
}
class Flags : public virtual mesos::internal::examples::Flags {};
int main(int argc, char** argv)
{
// Find this executable's directory to locate executor.
string uri;
Option<string> value = os::getenv("MESOS_HELPER_DIR");
if (value.isSome()) {
uri = path::join(value.get(), EXECUTOR_BINARY);
} else {
uri =
path::join(os::realpath(Path(argv[0]).dirname()).get(), EXECUTOR_BINARY);
}
Flags flags;
Try<flags::Warnings> load = flags.load("MESOS_EXAMPLE_", argc, argv);
if (flags.help) {
cout << flags.usage() << endl;
return EXIT_SUCCESS;
}
if (load.isError()) {
cerr << flags.usage(load.error()) << endl;
return EXIT_FAILURE;
}
internal::logging::initialize(argv[0], true, flags); // Catch signals.
// Log any flag warnings (after logging is initialized).
foreach (const flags::Warning& warning, load->warnings) {
LOG(WARNING) << warning.message;
}
ExecutorInfo executor;
executor.mutable_executor_id()->set_value("default");
executor.mutable_command()->set_value(uri);
executor.set_name(EXECUTOR_NAME);
FrameworkInfo framework;
framework.set_user(""); // Have Mesos fill in the current user.
framework.set_principal(flags.principal);
framework.set_name(FRAMEWORK_NAME);
framework.add_roles(flags.role);
framework.add_capabilities()->set_type(
FrameworkInfo::Capability::MULTI_ROLE);
framework.add_capabilities()->set_type(
FrameworkInfo::Capability::RESERVATION_REFINEMENT);
framework.set_checkpoint(flags.checkpoint);
bool implicitAcknowledgements = true;
if (os::getenv("MESOS_EXPLICIT_ACKNOWLEDGEMENTS").isSome()) {
cout << "Enabling explicit acknowledgements for status updates" << endl;
implicitAcknowledgements = false;
}
if (flags.master == "local") {
// Configure master.
os::setenv("MESOS_ROLES", flags.role);
os::setenv("MESOS_AUTHENTICATE_FRAMEWORKS", stringify(flags.authenticate));
ACLs acls;
ACL::RegisterFramework* acl = acls.add_register_frameworks();
acl->mutable_principals()->set_type(ACL::Entity::ANY);
acl->mutable_roles()->add_values(flags.role);
os::setenv("MESOS_ACLS", stringify(JSON::protobuf(acls)));
// Configure agent.
os::setenv("MESOS_DEFAULT_ROLE", flags.role);
}
MesosSchedulerDriver* driver;
TestScheduler scheduler(implicitAcknowledgements, executor, flags.role);
if (flags.authenticate) {
cout << "Enabling authentication for the framework" << endl;
Credential credential;
credential.set_principal(flags.principal);
if (flags.secret.isSome()) {
credential.set_secret(flags.secret.get());
}
driver = new MesosSchedulerDriver(
&scheduler,
framework,
flags.master,
implicitAcknowledgements,
credential);
} else {
driver = new MesosSchedulerDriver(
&scheduler,
framework,
flags.master,
implicitAcknowledgements);
}
int status = driver->run() == DRIVER_STOPPED ? 0 : 1;
// Ensure that the driver process terminates.
driver->stop();
delete driver;
return status;
}