blob: e6e7137e6d4df40ceae7345500a14e7bee68b2c5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <libgen.h>
#include <iostream>
#include <string>
#include <boost/lexical_cast.hpp>
#include <mesos/resources.hpp>
#include <mesos/scheduler.hpp>
#include <stout/check.hpp>
#include <stout/exit.hpp>
#include <stout/flags.hpp>
#include <stout/numify.hpp>
#include <stout/os.hpp>
#include <stout/stringify.hpp>
#include "logging/flags.hpp"
using namespace mesos;
using boost::lexical_cast;
using std::cerr;
using std::cout;
using std::endl;
using std::flush;
using std::string;
using std::vector;
using mesos::Resources;
const int32_t CPUS_PER_TASK = 1;
const int32_t MEM_PER_TASK = 32;
class TestScheduler : public Scheduler
{
public:
TestScheduler(const ExecutorInfo& _executor, const string& _role)
: executor(_executor),
role(_role),
tasksLaunched(0),
tasksFinished(0),
totalTasks(5) {}
virtual ~TestScheduler() {}
virtual void registered(SchedulerDriver*,
const FrameworkID&,
const MasterInfo&)
{
cout << "Registered!" << endl;
}
virtual void reregistered(SchedulerDriver*, const MasterInfo& masterInfo) {}
virtual void disconnected(SchedulerDriver* driver) {}
virtual void resourceOffers(SchedulerDriver* driver,
const vector<Offer>& offers)
{
cout << "." << flush;
for (size_t i = 0; i < offers.size(); i++) {
const Offer& offer = offers[i];
static const Resources TASK_RESOURCES = Resources::parse(
"cpus:" + stringify(CPUS_PER_TASK) +
";mem:" + stringify(MEM_PER_TASK)).get();
Resources remaining = offer.resources();
// Launch tasks.
vector<TaskInfo> tasks;
while (tasksLaunched < totalTasks &&
TASK_RESOURCES <= remaining.flatten()) {
int taskId = tasksLaunched++;
cout << "Starting task " << taskId << " on "
<< offer.hostname() << endl;
TaskInfo task;
task.set_name("Task " + lexical_cast<string>(taskId));
task.mutable_task_id()->set_value(lexical_cast<string>(taskId));
task.mutable_slave_id()->MergeFrom(offer.slave_id());
task.mutable_executor()->MergeFrom(executor);
Option<Resources> resources = remaining.find(TASK_RESOURCES, role);
CHECK_SOME(resources);
task.mutable_resources()->MergeFrom(resources.get());
remaining -= resources.get();
tasks.push_back(task);
}
driver->launchTasks(offer.id(), tasks);
}
}
virtual void offerRescinded(SchedulerDriver* driver,
const OfferID& offerId) {}
virtual void statusUpdate(SchedulerDriver* driver, const TaskStatus& status)
{
int taskId = lexical_cast<int>(status.task_id().value());
cout << "Task " << taskId << " is in state " << status.state() << endl;
if (status.state() == TASK_FINISHED)
tasksFinished++;
if (tasksFinished == totalTasks)
driver->stop();
}
virtual void frameworkMessage(SchedulerDriver* driver,
const ExecutorID& executorId,
const SlaveID& slaveId,
const string& data) {}
virtual void slaveLost(SchedulerDriver* driver, const SlaveID& sid) {}
virtual void executorLost(SchedulerDriver* driver,
const ExecutorID& executorID,
const SlaveID& slaveID,
int status) {}
virtual void error(SchedulerDriver* driver, const string& message)
{
cout << message << endl;
}
private:
const ExecutorInfo executor;
string role;
int tasksLaunched;
int tasksFinished;
int totalTasks;
};
void usage(const char* argv0, const flags::FlagsBase& flags)
{
cerr << "Usage: " << os::basename(argv0).get() << " [...]" << endl
<< endl
<< "Supported options:" << endl
<< flags.usage();
}
int main(int argc, char** argv)
{
// Find this executable's directory to locate executor.
string path = os::realpath(dirname(argv[0])).get();
string uri = path + "/test-executor";
if (getenv("MESOS_BUILD_DIR")) {
uri = string(getenv("MESOS_BUILD_DIR")) + "/src/test-executor";
}
mesos::internal::logging::Flags flags;
string role;
flags.add(&role,
"role",
"Role to use when registering",
"*");
Option<string> master;
flags.add(&master,
"master",
"ip:port of master to connect");
Try<Nothing> load = flags.load(None(), argc, argv);
if (load.isError()) {
cerr << load.error() << endl;
usage(argv[0], flags);
exit(1);
} else if (master.isNone()) {
cerr << "Missing --master" << endl;
usage(argv[0], flags);
exit(1);
}
ExecutorInfo executor;
executor.mutable_executor_id()->set_value("default");
executor.mutable_command()->set_value(uri);
executor.set_name("Test Executor (C++)");
executor.set_source("cpp_test");
TestScheduler scheduler(executor, role);
FrameworkInfo framework;
framework.set_user(""); // Have Mesos fill in the current user.
framework.set_name("Test Framework (C++)");
framework.set_role(role);
if (os::hasenv("MESOS_CHECKPOINT")) {
framework.set_checkpoint(
numify<bool>(os::getenv("MESOS_CHECKPOINT")).get());
}
MesosSchedulerDriver* driver;
if (os::hasenv("MESOS_AUTHENTICATE")) {
cout << "Enabling authentication for the framework" << endl;
if (!os::hasenv("DEFAULT_PRINCIPAL")) {
EXIT(1) << "Expecting authentication principal in the environment";
}
if (!os::hasenv("DEFAULT_SECRET")) {
EXIT(1) << "Expecting authentication secret in the environment";
}
Credential credential;
credential.set_principal(getenv("DEFAULT_PRINCIPAL"));
credential.set_secret(getenv("DEFAULT_SECRET"));
framework.set_principal(getenv("DEFAULT_PRINCIPAL"));
driver = new MesosSchedulerDriver(
&scheduler, framework, master.get(), credential);
} else {
framework.set_principal("test-framework-cpp");
driver = new MesosSchedulerDriver(
&scheduler, framework, master.get());
}
int status = driver->run() == DRIVER_STOPPED ? 0 : 1;
// Ensure that the driver process terminates.
driver->stop();
delete driver;
return status;
}