blob: c4c3aa68dc3e6e001f9a746ea5151b8ad958856f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <iostream>
#include <string>
#include <boost/lexical_cast.hpp>
#include <mesos/scheduler.hpp>
#include <stout/numify.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/stringify.hpp>
using namespace mesos;
using boost::lexical_cast;
using std::cout;
using std::cerr;
using std::endl;
using std::flush;
using std::string;
using std::vector;
const int32_t CPUS_PER_TASK = 1;
const int32_t MEM_PER_TASK = 32;
class LongLivedScheduler : public Scheduler
{
public:
explicit LongLivedScheduler(const ExecutorInfo& _executor)
: executor(_executor),
tasksLaunched(0) {}
virtual ~LongLivedScheduler() {}
virtual void registered(SchedulerDriver*,
const FrameworkID&,
const MasterInfo&)
{
cout << "Registered!" << endl;
}
virtual void reregistered(SchedulerDriver*, const MasterInfo& masterInfo) {}
virtual void disconnected(SchedulerDriver* driver) {}
virtual void resourceOffers(SchedulerDriver* driver,
const vector<Offer>& offers)
{
cout << "." << flush;
for (size_t i = 0; i < offers.size(); i++) {
const Offer& offer = offers[i];
// Lookup resources we care about.
// TODO(benh): It would be nice to ultimately have some helper
// functions for looking up resources.
double cpus = 0;
double mem = 0;
for (int i = 0; i < offer.resources_size(); i++) {
const Resource& resource = offer.resources(i);
if (resource.name() == "cpus" &&
resource.type() == Value::SCALAR) {
cpus = resource.scalar().value();
} else if (resource.name() == "mem" &&
resource.type() == Value::SCALAR) {
mem = resource.scalar().value();
}
}
// Launch tasks (only one per offer).
vector<TaskInfo> tasks;
if (cpus >= CPUS_PER_TASK && mem >= MEM_PER_TASK) {
int taskId = tasksLaunched++;
cout << "Starting task " << taskId << " on "
<< offer.hostname() << endl;
TaskInfo task;
task.set_name("Task " + lexical_cast<string>(taskId));
task.mutable_task_id()->set_value(lexical_cast<string>(taskId));
task.mutable_slave_id()->MergeFrom(offer.slave_id());
task.mutable_executor()->MergeFrom(executor);
Resource* resource;
resource = task.add_resources();
resource->set_name("cpus");
resource->set_type(Value::SCALAR);
resource->mutable_scalar()->set_value(CPUS_PER_TASK);
resource = task.add_resources();
resource->set_name("mem");
resource->set_type(Value::SCALAR);
resource->mutable_scalar()->set_value(MEM_PER_TASK);
tasks.push_back(task);
cpus -= CPUS_PER_TASK;
mem -= MEM_PER_TASK;
}
driver->launchTasks(offer.id(), tasks);
}
}
virtual void offerRescinded(SchedulerDriver* driver,
const OfferID& offerId) {}
virtual void statusUpdate(SchedulerDriver* driver, const TaskStatus& status)
{
int taskId = lexical_cast<int>(status.task_id().value());
cout << "Task " << taskId << " is in state " << status.state() << endl;
}
virtual void frameworkMessage(SchedulerDriver* driver,
const ExecutorID& executorId,
const SlaveID& slaveId,
const string& data) {}
virtual void slaveLost(SchedulerDriver* driver, const SlaveID& sid) {}
virtual void executorLost(SchedulerDriver* driver,
const ExecutorID& executorId,
const SlaveID& slaveId,
int status) {}
virtual void error(SchedulerDriver* driver, const string& message) {}
private:
const ExecutorInfo executor;
string uri;
int tasksLaunched;
};
int main(int argc, char** argv)
{
if (argc != 2) {
cerr << "Usage: " << argv[0] << " <master>" << endl;
return -1;
}
// Find this executable's directory to locate executor.
string uri;
Option<string> value = os::getenv("MESOS_BUILD_DIR");
if (value.isSome()) {
uri = path::join(value.get(), "src", "long-lived-executor");
} else {
uri = path::join(
os::realpath(Path(argv[0]).dirname()).get(),
"long-lived-executor");
}
ExecutorInfo executor;
executor.mutable_executor_id()->set_value("default");
executor.mutable_command()->set_value(uri);
executor.set_name("Long Lived Executor (C++)");
executor.set_source("cpp_long_lived_framework");
LongLivedScheduler scheduler(executor);
FrameworkInfo framework;
framework.set_user(""); // Have Mesos fill in the current user.
framework.set_name("Long Lived Framework (C++)");
value = os::getenv("MESOS_CHECKPOINT");
if (value.isSome()) {
framework.set_checkpoint(
numify<bool>(value.get()).get());
}
MesosSchedulerDriver* driver;
if (os::getenv("MESOS_AUTHENTICATE").isSome()) {
cout << "Enabling authentication for the framework" << endl;
value = os::getenv("DEFAULT_PRINCIPAL");
if (value.isNone()) {
EXIT(1) << "Expecting authentication principal in the environment";
}
Credential credential;
credential.set_principal(value.get());
framework.set_principal(value.get());
value = os::getenv("DEFAULT_SECRET");
if (value.isNone()) {
EXIT(1) << "Expecting authentication secret in the environment";
}
credential.set_secret(value.get());
driver = new MesosSchedulerDriver(
&scheduler, framework, argv[1], credential);
} else {
framework.set_principal("long-lived-framework-cpp");
driver = new MesosSchedulerDriver(
&scheduler, framework, argv[1]);
}
int status = driver->run() == DRIVER_STOPPED ? 0 : 1;
// Ensure that the driver process terminates.
driver->stop();
delete driver;
return status;
}