blob: de3d5fa6eb817423bfa7a2213511ed4e74907502 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <string>
#include <vector>
#include <mesos/resources.hpp>
#include <mesos/scheduler.hpp>
#include <mesos/type_utils.hpp>
#include <mesos/authorizer/acls.hpp>
#include <stout/exit.hpp>
#include <stout/flags.hpp>
#include <stout/foreach.hpp>
#include <stout/hashset.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/protobuf.hpp>
#include <stout/stringify.hpp>
#include <stout/try.hpp>
#include "common/protobuf_utils.hpp"
#include "common/status_utils.hpp"
#include "examples/flags.hpp"
#include "logging/logging.hpp"
using namespace mesos;
using namespace mesos::internal;
using std::cerr;
using std::cout;
using std::endl;
using std::string;
using std::vector;
using mesos::Resources;
constexpr char FRAMEWORK_NAME[] = "No Executor Framework";
static Offer::Operation LAUNCH(const vector<TaskInfo>& tasks)
{
Offer::Operation operation;
operation.set_type(Offer::Operation::LAUNCH);
foreach (const TaskInfo& task, tasks) {
operation.mutable_launch()->add_task_infos()->CopyFrom(task);
}
return operation;
}
class NoExecutorScheduler : public Scheduler
{
public:
NoExecutorScheduler(
const FrameworkInfo& _frameworkInfo,
const string& _command,
const Resources& _taskResources,
const Option<size_t>& _totalTasks)
: frameworkInfo(_frameworkInfo),
command(_command),
taskResources(_taskResources),
tasksLaunched(0u),
tasksFinished(0u),
tasksTerminated(0u),
totalTasks(_totalTasks) {}
void registered(
SchedulerDriver* driver,
const FrameworkID& frameworkId,
const MasterInfo& masterInfo) override
{
LOG(INFO) << "Registered with master " << masterInfo
<< " and got framework ID " << frameworkId;
frameworkInfo.mutable_id()->CopyFrom(frameworkId);
}
void reregistered(
SchedulerDriver* driver,
const MasterInfo& masterInfo) override
{
LOG(INFO) << "Reregistered with master " << masterInfo;
}
void disconnected(SchedulerDriver* driver) override
{
LOG(INFO) << "Disconnected!";
}
void resourceOffers(
SchedulerDriver* driver,
const vector<Offer>& offers) override
{
foreach (const Offer& offer, offers) {
LOG(INFO) << "Received offer " << offer.id() << " from agent "
<< offer.slave_id() << " (" << offer.hostname() << ") "
<< "with " << offer.resources();
Resources remaining = offer.resources();
vector<TaskInfo> tasks;
while (remaining.contains(taskResources) &&
(totalTasks.isNone() || tasksLaunched < totalTasks.get())) {
TaskInfo task;
task.mutable_task_id()->set_value(stringify(tasksLaunched));
task.set_name(command);
task.mutable_slave_id()->CopyFrom(offer.slave_id());
task.mutable_resources()->CopyFrom(taskResources);
task.mutable_command()->set_shell(true);
task.mutable_command()->set_value(command);
remaining -= taskResources;
tasks.push_back(task);
activeTasks.insert(task.task_id());
tasksLaunched++;
}
driver->acceptOffers({offer.id()}, {LAUNCH(tasks)});
}
}
void offerRescinded(SchedulerDriver* driver, const OfferID& offerId) override
{
LOG(INFO) << "Offer " << offerId << " has been rescinded";
}
void statusUpdate(SchedulerDriver* driver, const TaskStatus& status) override
{
if (!activeTasks.contains(status.task_id())) {
LOG(WARNING) << "Unknown task '" << status.task_id() << "'"
<< " is in state " << status.state();
return;
}
if (status.state() == TASK_LOST ||
status.state() == TASK_KILLED ||
status.state() == TASK_FAILED) {
LOG(ERROR) << "Task '" << status.task_id() << "'"
<< " is in unexpected state " << status.state()
<< (status.has_reason()
? " with reason " + stringify(status.reason()) : "")
<< " from source " << status.source()
<< " with message '" << status.message() << "'";
} else {
LOG(INFO) << "Task '" << status.task_id() << "'"
<< " is in state " << status.state();
}
if (internal::protobuf::isTerminalState(status.state())) {
if (status.state() == TASK_FINISHED) {
tasksFinished++;
}
tasksTerminated++;
activeTasks.erase(status.task_id());
}
if (totalTasks.isSome() && tasksTerminated == totalTasks.get()) {
if (tasksTerminated - tasksFinished > 0) {
EXIT(EXIT_FAILURE)
<< "Failed to complete successfully: "
<< stringify(tasksTerminated - tasksFinished)
<< " of " << stringify(totalTasks.get()) << " terminated abnormally";
} else {
driver->stop();
}
}
}
void frameworkMessage(
SchedulerDriver* driver,
const ExecutorID& executorId,
const SlaveID& slaveId,
const string& data) override
{
LOG(FATAL) << "Received framework message from executor '" << executorId
<< "' on agent " << slaveId << ": '" << data << "'";
}
void slaveLost(SchedulerDriver* driver, const SlaveID& slaveId) override
{
LOG(INFO) << "Lost agent " << slaveId;
}
void executorLost(
SchedulerDriver* driver,
const ExecutorID& executorId,
const SlaveID& slaveId,
int status) override
{
LOG(INFO) << "Lost executor '" << executorId << "' on agent "
<< slaveId << ", " << WSTRINGIFY(status);
}
void error(SchedulerDriver* driver, const string& message) override
{
LOG(ERROR) << message;
}
private:
FrameworkInfo frameworkInfo;
string command;
Resources taskResources;
hashset<TaskID> activeTasks;
size_t tasksLaunched;
size_t tasksFinished;
size_t tasksTerminated;
Option<size_t> totalTasks;
};
class Flags : public virtual mesos::internal::examples::Flags
{
public:
Flags()
{
add(&Flags::command,
"command",
"The command to run for each task.",
"echo hello");
add(&Flags::task_resources,
"task_resources",
"The resources that the task uses.",
"cpus:0.1;mem:32;disk:32");
// TODO(bmahler): We need to take a separate flag for
// revocable resources because there is no support yet
// for specifying revocable resources in a resource string.
add(&Flags::task_revocable_resources,
"task_revocable_resources",
"The revocable resources that the task uses.");
add(&Flags::num_tasks,
"num_tasks",
"Optionally, the number of tasks to run to completion before exiting.\n"
"If unset, as many tasks as possible will be launched.");
}
string command;
string task_resources;
Option<string> task_revocable_resources;
Option<size_t> num_tasks;
};
int main(int argc, char** argv)
{
Flags flags;
Try<flags::Warnings> load = flags.load("MESOS_EXAMPLE_", argc, argv);
if (flags.help) {
cout << flags.usage() << endl;
return EXIT_SUCCESS;
}
if (load.isError()) {
cerr << flags.usage(load.error()) << endl;
return EXIT_FAILURE;
}
logging::initialize(argv[0], true, flags); // Catch signals.
// Log any flag warnings.
foreach (const flags::Warning& warning, load->warnings) {
LOG(WARNING) << warning.message;
}
FrameworkInfo framework;
framework.set_user(""); // Have Mesos fill in the current user.
framework.set_principal(flags.principal);
framework.set_name(FRAMEWORK_NAME);
framework.set_checkpoint(flags.checkpoint);
framework.add_roles(flags.role);
framework.add_capabilities()->set_type(
FrameworkInfo::Capability::MULTI_ROLE);
framework.add_capabilities()->set_type(
FrameworkInfo::Capability::RESERVATION_REFINEMENT);
if (flags.task_revocable_resources.isSome()) {
framework.add_capabilities()->set_type(
FrameworkInfo::Capability::REVOCABLE_RESOURCES);
}
Try<Resources> resources = Resources::parse(flags.task_resources);
if (resources.isError()) {
EXIT(EXIT_FAILURE) << "Invalid '--task_resources': " << resources.error();
}
Resources taskResources = resources.get();
if (flags.task_revocable_resources.isSome()) {
Try<Resources> revocableResources =
Resources::parse(flags.task_revocable_resources.get());
if (revocableResources.isError()) {
EXIT(EXIT_FAILURE) << "Invalid '--task_revocable_resources': "
<< revocableResources.error();
}
foreach (Resource revocable, revocableResources.get()) {
revocable.mutable_revocable();
taskResources += revocable;
}
}
taskResources.allocate(framework.roles(0));
NoExecutorScheduler scheduler(
framework,
flags.command,
taskResources,
flags.num_tasks);
if (flags.master == "local") {
// Configure master.
os::setenv("MESOS_ROLES", flags.role);
os::setenv("MESOS_AUTHENTICATE_FRAMEWORKS", stringify(flags.authenticate));
ACLs acls;
ACL::RegisterFramework* acl = acls.add_register_frameworks();
acl->mutable_principals()->set_type(ACL::Entity::ANY);
acl->mutable_roles()->add_values(flags.role);
os::setenv("MESOS_ACLS", stringify(JSON::protobuf(acls)));
}
MesosSchedulerDriver* driver;
if (flags.authenticate) {
LOG(INFO) << "Enabling authentication for the framework";
Credential credential;
credential.set_principal(flags.principal);
if (flags.secret.isSome()) {
credential.set_secret(flags.secret.get());
}
driver = new MesosSchedulerDriver(
&scheduler,
framework,
flags.master,
credential);
} else {
driver = new MesosSchedulerDriver(
&scheduler,
framework,
flags.master);
}
int status = driver->run() == DRIVER_STOPPED ? 0 : 1;
// Ensure that the driver process terminates.
driver->stop();
delete driver;
return status;
}