| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <libgen.h> |
| |
| #include <iostream> |
| #include <string> |
| |
| #include <boost/lexical_cast.hpp> |
| |
| #include <mesos/resources.hpp> |
| #include <mesos/scheduler.hpp> |
| |
| #include <stout/check.hpp> |
| #include <stout/exit.hpp> |
| #include <stout/flags.hpp> |
| #include <stout/numify.hpp> |
| #include <stout/os.hpp> |
| #include <stout/stringify.hpp> |
| |
| #include "logging/flags.hpp" |
| |
| using namespace mesos; |
| |
| using boost::lexical_cast; |
| |
| using std::cerr; |
| using std::cout; |
| using std::endl; |
| using std::flush; |
| using std::string; |
| using std::vector; |
| |
| using mesos::Resources; |
| |
| const int32_t CPUS_PER_TASK = 1; |
| const int32_t MEM_PER_TASK = 32; |
| |
| class TestScheduler : public Scheduler |
| { |
| public: |
| TestScheduler(const ExecutorInfo& _executor, const string& _role) |
| : executor(_executor), |
| role(_role), |
| tasksLaunched(0), |
| tasksFinished(0), |
| totalTasks(5) {} |
| |
| virtual ~TestScheduler() {} |
| |
| virtual void registered(SchedulerDriver*, |
| const FrameworkID&, |
| const MasterInfo&) |
| { |
| cout << "Registered!" << endl; |
| } |
| |
| virtual void reregistered(SchedulerDriver*, const MasterInfo& masterInfo) {} |
| |
| virtual void disconnected(SchedulerDriver* driver) {} |
| |
| virtual void resourceOffers(SchedulerDriver* driver, |
| const vector<Offer>& offers) |
| { |
| cout << "." << flush; |
| for (size_t i = 0; i < offers.size(); i++) { |
| const Offer& offer = offers[i]; |
| |
| static const Resources TASK_RESOURCES = Resources::parse( |
| "cpus:" + stringify(CPUS_PER_TASK) + |
| ";mem:" + stringify(MEM_PER_TASK)).get(); |
| |
| Resources remaining = offer.resources(); |
| |
| // Launch tasks. |
| vector<TaskInfo> tasks; |
| while (tasksLaunched < totalTasks && |
| TASK_RESOURCES <= remaining.flatten()) { |
| int taskId = tasksLaunched++; |
| |
| cout << "Starting task " << taskId << " on " |
| << offer.hostname() << endl; |
| |
| TaskInfo task; |
| task.set_name("Task " + lexical_cast<string>(taskId)); |
| task.mutable_task_id()->set_value(lexical_cast<string>(taskId)); |
| task.mutable_slave_id()->MergeFrom(offer.slave_id()); |
| task.mutable_executor()->MergeFrom(executor); |
| |
| Option<Resources> resources = remaining.find(TASK_RESOURCES, role); |
| CHECK_SOME(resources); |
| task.mutable_resources()->MergeFrom(resources.get()); |
| remaining -= resources.get(); |
| |
| tasks.push_back(task); |
| } |
| |
| driver->launchTasks(offer.id(), tasks); |
| } |
| } |
| |
| virtual void offerRescinded(SchedulerDriver* driver, |
| const OfferID& offerId) {} |
| |
| virtual void statusUpdate(SchedulerDriver* driver, const TaskStatus& status) |
| { |
| int taskId = lexical_cast<int>(status.task_id().value()); |
| |
| cout << "Task " << taskId << " is in state " << status.state() << endl; |
| |
| if (status.state() == TASK_FINISHED) |
| tasksFinished++; |
| |
| if (tasksFinished == totalTasks) |
| driver->stop(); |
| } |
| |
| virtual void frameworkMessage(SchedulerDriver* driver, |
| const ExecutorID& executorId, |
| const SlaveID& slaveId, |
| const string& data) {} |
| |
| virtual void slaveLost(SchedulerDriver* driver, const SlaveID& sid) {} |
| |
| virtual void executorLost(SchedulerDriver* driver, |
| const ExecutorID& executorID, |
| const SlaveID& slaveID, |
| int status) {} |
| |
| virtual void error(SchedulerDriver* driver, const string& message) |
| { |
| cout << message << endl; |
| } |
| |
| private: |
| const ExecutorInfo executor; |
| string role; |
| int tasksLaunched; |
| int tasksFinished; |
| int totalTasks; |
| }; |
| |
| |
| void usage(const char* argv0, const flags::FlagsBase& flags) |
| { |
| cerr << "Usage: " << os::basename(argv0).get() << " [...]" << endl |
| << endl |
| << "Supported options:" << endl |
| << flags.usage(); |
| } |
| |
| |
| int main(int argc, char** argv) |
| { |
| // Find this executable's directory to locate executor. |
| string path = os::realpath(dirname(argv[0])).get(); |
| string uri = path + "/test-executor"; |
| if (getenv("MESOS_BUILD_DIR")) { |
| uri = string(getenv("MESOS_BUILD_DIR")) + "/src/test-executor"; |
| } |
| |
| mesos::internal::logging::Flags flags; |
| |
| string role; |
| flags.add(&role, |
| "role", |
| "Role to use when registering", |
| "*"); |
| |
| Option<string> master; |
| flags.add(&master, |
| "master", |
| "ip:port of master to connect"); |
| |
| Try<Nothing> load = flags.load(None(), argc, argv); |
| |
| if (load.isError()) { |
| cerr << load.error() << endl; |
| usage(argv[0], flags); |
| exit(1); |
| } else if (master.isNone()) { |
| cerr << "Missing --master" << endl; |
| usage(argv[0], flags); |
| exit(1); |
| } |
| |
| ExecutorInfo executor; |
| executor.mutable_executor_id()->set_value("default"); |
| executor.mutable_command()->set_value(uri); |
| executor.set_name("Test Executor (C++)"); |
| executor.set_source("cpp_test"); |
| |
| TestScheduler scheduler(executor, role); |
| |
| FrameworkInfo framework; |
| framework.set_user(""); // Have Mesos fill in the current user. |
| framework.set_name("Test Framework (C++)"); |
| framework.set_role(role); |
| |
| if (os::hasenv("MESOS_CHECKPOINT")) { |
| framework.set_checkpoint( |
| numify<bool>(os::getenv("MESOS_CHECKPOINT")).get()); |
| } |
| |
| MesosSchedulerDriver* driver; |
| if (os::hasenv("MESOS_AUTHENTICATE")) { |
| cout << "Enabling authentication for the framework" << endl; |
| |
| if (!os::hasenv("DEFAULT_PRINCIPAL")) { |
| EXIT(1) << "Expecting authentication principal in the environment"; |
| } |
| |
| if (!os::hasenv("DEFAULT_SECRET")) { |
| EXIT(1) << "Expecting authentication secret in the environment"; |
| } |
| |
| Credential credential; |
| credential.set_principal(getenv("DEFAULT_PRINCIPAL")); |
| credential.set_secret(getenv("DEFAULT_SECRET")); |
| |
| framework.set_principal(getenv("DEFAULT_PRINCIPAL")); |
| |
| driver = new MesosSchedulerDriver( |
| &scheduler, framework, master.get(), credential); |
| } else { |
| framework.set_principal("test-framework-cpp"); |
| |
| driver = new MesosSchedulerDriver( |
| &scheduler, framework, master.get()); |
| } |
| |
| int status = driver->run() == DRIVER_STOPPED ? 0 : 1; |
| |
| // Ensure that the driver process terminates. |
| driver->stop(); |
| |
| delete driver; |
| return status; |
| } |