blob: 42e4e279d059801cd85955fd04995b60051a2b5e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __MESOS_SCHEDULER_HPP__
#define __MESOS_SCHEDULER_HPP__
#if __cplusplus >= 201103L
#include <functional>
#else // __cplusplus >= 201103L
#include <tr1/functional>
#endif // __cplusplus >= 201103L
#include <queue>
#include <pthread.h>
#include <string>
#include <vector>
#include <mesos/mesos.hpp>
#include <mesos/scheduler/scheduler.hpp>
/**
* Mesos scheduler interface and scheduler driver. A scheduler is used
* to interact with Mesos in order run distributed computations.
*
* IF YOU FIND YOURSELF MODIFYING COMMENTS HERE PLEASE CONSIDER MAKING
* THE SAME MODIFICATIONS FOR OTHER LANGUAGE BINDINGS (e.g., Java:
* src/java/src/org/apache/mesos, Python: src/python/src, etc.).
*/
namespace mesos {
// A few forward declarations.
class SchedulerDriver;
namespace scheduler {
class MesosProcess;
} // namespace scheduler {
namespace internal {
class MasterDetector;
class SchedulerProcess;
} // namespace internal {
/**
* Callback interface to be implemented by frameworks'
* schedulers. Note that only one callback will be invoked at a time,
* so it is not recommended that you block within a callback because
* it may cause a deadlock.
*
* Each callback includes a pointer to the scheduler driver that was
* used to run this scheduler. The pointer will not change for the
* duration of a scheduler (i.e., from the point you do
* SchedulerDriver::start() to the point that SchedulerDriver::join()
* returns). This is intended for convenience so that a scheduler
* doesn't need to store a pointer to the driver itself.
*/
class Scheduler
{
public:
/**
* Empty virtual destructor (necessary to instantiate subclasses).
*/
virtual ~Scheduler() {}
/**
* Invoked when the scheduler successfully registers with a Mesos
* master. A unique ID (generated by the master) used for
* distinguishing this framework from others and MasterInfo
* with the ip and port of the current master are provided as arguments.
*/
virtual void registered(SchedulerDriver* driver,
const FrameworkID& frameworkId,
const MasterInfo& masterInfo) = 0;
/**
* Invoked when the scheduler re-registers with a newly elected Mesos master.
* This is only called when the scheduler has previously been registered.
* MasterInfo containing the updated information about the elected master
* is provided as an argument.
*/
virtual void reregistered(SchedulerDriver* driver,
const MasterInfo& masterInfo) = 0;
/**
* Invoked when the scheduler becomes "disconnected" from the master
* (e.g., the master fails and another is taking over).
*/
virtual void disconnected(SchedulerDriver* driver) = 0;
/**
* Invoked when resources have been offered to this framework. A
* single offer will only contain resources from a single slave.
* Resources associated with an offer will not be re-offered to
* _this_ framework until either (a) this framework has rejected
* those resources (see SchedulerDriver::launchTasks) or (b) those
* resources have been rescinded (see Scheduler::offerRescinded).
* Note that resources may be concurrently offered to more than one
* framework at a time (depending on the allocator being used). In
* that case, the first framework to launch tasks using those
* resources will be able to use them while the other frameworks
* will have those resources rescinded (or if a framework has
* already launched tasks with those resources then those tasks will
* fail with a TASK_LOST status and a message saying as much).
*/
virtual void resourceOffers(SchedulerDriver* driver,
const std::vector<Offer>& offers) = 0;
/**
* Invoked when an offer is no longer valid (e.g., the slave was
* lost or another framework used resources in the offer). If for
* whatever reason an offer is never rescinded (e.g., dropped
* message, failing over framework, etc.), a framwork that attempts
* to launch tasks using an invalid offer will receive TASK_LOST
* status updates for those tasks (see Scheduler::resourceOffers).
*/
virtual void offerRescinded(SchedulerDriver* driver,
const OfferID& offerId) = 0;
/**
* Invoked when the status of a task has changed (e.g., a slave is
* lost and so the task is lost, a task finishes and an executor
* sends a status update saying so, etc). Note that returning from
* this callback _acknowledges_ receipt of this status update! If
* for whatever reason the scheduler aborts during this callback (or
* the process exits) another status update will be delivered (note,
* however, that this is currently not true if the slave sending the
* status update is lost/fails during that time).
*/
virtual void statusUpdate(SchedulerDriver* driver,
const TaskStatus& status) = 0;
/**
* Invoked when an executor sends a message. These messages are best
* effort; do not expect a framework message to be retransmitted in
* any reliable fashion.
*/
virtual void frameworkMessage(SchedulerDriver* driver,
const ExecutorID& executorId,
const SlaveID& slaveId,
const std::string& data) = 0;
/**
* Invoked when a slave has been determined unreachable (e.g.,
* machine failure, network partition). Most frameworks will need to
* reschedule any tasks launched on this slave on a new slave.
*/
virtual void slaveLost(SchedulerDriver* driver,
const SlaveID& slaveId) = 0;
/**
* Invoked when an executor has exited/terminated. Note that any
* tasks running will have TASK_LOST status updates automagically
* generated.
*/
virtual void executorLost(SchedulerDriver* driver,
const ExecutorID& executorId,
const SlaveID& slaveId,
int status) = 0;
/**
* Invoked when there is an unrecoverable error in the scheduler or
* scheduler driver. The driver will be aborted BEFORE invoking this
* callback.
*/
virtual void error(SchedulerDriver* driver, const std::string& message) = 0;
};
/**
* Abstract interface for connecting a scheduler to Mesos. This
* interface is used both to manage the scheduler's lifecycle (start
* it, stop it, or wait for it to finish) and to interact with Mesos
* (e.g., launch tasks, kill tasks, etc.). See MesosSchedulerDriver
* below for a concrete example of a SchedulerDriver.
*/
class SchedulerDriver
{
public:
/**
* Empty virtual destructor (necessary to instantiate subclasses).
* It is expected that 'stop()' is called before this is called.
*/
virtual ~SchedulerDriver() {}
/**
* Starts the scheduler driver. This needs to be called before any
* other driver calls are made.
*/
virtual Status start() = 0;
/**
* Stops the scheduler driver. If the 'failover' flag is set to
* false then it is expected that this framework will never
* reconnect to Mesos. So Mesos will unregister the framework
* and shutdown all its tasks and executors. If 'failover' is true,
* all executors and tasks will remain running (for some framework
* specific failover timeout) allowing the scheduler to reconnect
* (possibly in the same process, or from a different process, for
* example, on a different machine).
*/
virtual Status stop(bool failover = false) = 0;
/**
* Aborts the driver so that no more callbacks can be made to the
* scheduler. The semantics of abort and stop have deliberately been
* separated so that code can detect an aborted driver (i.e., via
* the return status of SchedulerDriver::join, see below), and
* instantiate and start another driver if desired (from within the
* same process). Note that 'stop()' is not automatically called
* inside 'abort()'.
*/
virtual Status abort() = 0;
/**
* Waits for the driver to be stopped or aborted, possibly
* _blocking_ the current thread indefinitely. The return status of
* this function can be used to determine if the driver was aborted
* (see mesos.proto for a description of Status).
*/
virtual Status join() = 0;
/**
* Starts and immediately joins (i.e., blocks on) the driver.
*/
virtual Status run() = 0;
/**
* Requests resources from Mesos (see mesos.proto for a description
* of Request and how, for example, to request resources
* from specific slaves). Any resources available are offered to the
* framework via Scheduler::resourceOffers callback, asynchronously.
*/
virtual Status requestResources(const std::vector<Request>& requests) = 0;
/**
* Launches the given set of tasks. Any resources remaining (i.e.,
* not used by the tasks or their executors) will be considered
* declined. The specified filters are applied on all unused
* resources (see mesos.proto for a description of Filters).
* Available resources are aggregated when mutiple offers are
* provided. Note that all offers must belong to the same slave.
* Invoking this function with an empty collection of tasks declines
* offers in their entirety (see Scheduler::declineOffer).
*/
virtual Status launchTasks(const std::vector<OfferID>& offerIds,
const std::vector<TaskInfo>& tasks,
const Filters& filters = Filters()) = 0;
/**
* DEPRECATED: Use launchTasks(offerIds, tasks, filters) instead.
*/
virtual Status launchTasks(const OfferID& offerId,
const std::vector<TaskInfo>& tasks,
const Filters& filters = Filters()) = 0;
/**
* Kills the specified task. Note that attempting to kill a task is
* currently not reliable. If, for example, a scheduler fails over
* while it was attempting to kill a task it will need to retry in
* the future. Likewise, if unregistered / disconnected, the request
* will be dropped (these semantics may be changed in the future).
*/
virtual Status killTask(const TaskID& taskId) = 0;
/**
* Declines an offer in its entirety and applies the specified
* filters on the resources (see mesos.proto for a description of
* Filters). Note that this can be done at any time, it is not
* necessary to do this within the Scheduler::resourceOffers
* callback.
*/
virtual Status declineOffer(const OfferID& offerId,
const Filters& filters = Filters()) = 0;
/**
* Removes all filters previously set by the framework (via
* launchTasks()). This enables the framework to receive offers from
* those filtered slaves.
*/
virtual Status reviveOffers() = 0;
/**
* Sends a message from the framework to one of its executors. These
* messages are best effort; do not expect a framework message to be
* retransmitted in any reliable fashion.
*/
virtual Status sendFrameworkMessage(const ExecutorID& executorId,
const SlaveID& slaveId,
const std::string& data) = 0;
/**
* Allows the framework to query the status for non-terminal tasks.
* This causes the master to send back the latest task status for
* each task in 'statuses', if possible. Tasks that are no longer
* known will result in a TASK_LOST update. If statuses is empty,
* then the master will send the latest status for each task
* currently known.
*/
virtual Status reconcileTasks(
const std::vector<TaskStatus>& statuses) = 0;
};
/**
* Concrete implementation of a SchedulerDriver that connects a
* Scheduler with a Mesos master. The MesosSchedulerDriver is
* thread-safe.
*
* Note that scheduler failover is supported in Mesos. After a
* scheduler is registered with Mesos it may failover (to a new
* process on the same machine or across multiple machines) by
* creating a new driver with the ID given to it in
* Scheduler::registered.
*
* The driver is responsible for invoking the Scheduler callbacks as
* it communicates with the Mesos master.
*
* Note that blocking on the MesosSchedulerDriver (e.g., via
* MesosSchedulerDriver::join) doesn't affect the scheduler callbacks
* in anyway because they are handled by a different thread.
*
* Note that the driver uses GLOG to do its own logging. GLOG flags can
* be set via environment variables, prefixing the flag name with
* "GLOG_", e.g., "GLOG_v=1". For Mesos specific logging flags see
* src/logging/flags.hpp. Mesos flags can also be set via environment
* variables, prefixing the flag name with "MESOS_", e.g.,
* "MESOS_QUIET=1".
*
* See src/examples/test_framework.cpp for an example of using the
* MesosSchedulerDriver.
*/
class MesosSchedulerDriver : public SchedulerDriver
{
public:
/**
* Creates a new driver for the specified scheduler. The master
* should be one of:
*
* host:port
* zk://host1:port1,host2:port2,.../path
* zk://username:password@host1:port1,host2:port2,.../path
* file:///path/to/file (where file contains one of the above)
*
* The driver will attempt to "failover" if the specified
* FrameworkInfo includes a valid FrameworkID.
*
* Any Mesos configuration options are read from environment
* variables, as well as any configuration files found through the
* environment variables.
*
* TODO(vinod): Deprecate this once 'MesosSchedulerDriver' can
* take 'Option<Credential>' as parameter. Currently it cannot
* because 'stout' is not visible from here.
*/
MesosSchedulerDriver(Scheduler* scheduler,
const FrameworkInfo& framework,
const std::string& master);
/**
* Same as the above constructor but takes 'credential' as argument.
*
* The credential will be used for authenticating with the master.
*/
MesosSchedulerDriver(Scheduler* scheduler,
const FrameworkInfo& framework,
const std::string& master,
const Credential& credential);
/**
* This destructor will block indefinitely if
* MesosSchedulerDriver::start was invoked successfully (possibly
* via MesosSchedulerDriver::run) and MesosSchedulerDriver::stop has
* not been invoked.
*/
virtual ~MesosSchedulerDriver();
/**
* See SchedulerDriver for descriptions of these.
*/
virtual Status start();
virtual Status stop(bool failover = false);
virtual Status abort();
virtual Status join();
virtual Status run();
virtual Status requestResources(const std::vector<Request>& requests);
/**
* TODO(nnielsen): launchTasks using single offer is deprecated.
* Use launchTasks with offer list instead.
*/
virtual Status launchTasks(const OfferID& offerId,
const std::vector<TaskInfo>& tasks,
const Filters& filters = Filters());
virtual Status launchTasks(const std::vector<OfferID>& offerIds,
const std::vector<TaskInfo>& tasks,
const Filters& filters = Filters());
virtual Status killTask(const TaskID& taskId);
virtual Status declineOffer(const OfferID& offerId,
const Filters& filters = Filters());
virtual Status reviveOffers();
virtual Status sendFrameworkMessage(const ExecutorID& executorId,
const SlaveID& slaveId,
const std::string& data);
virtual Status reconcileTasks(
const std::vector<TaskStatus>& statuses);
protected:
// Used to detect (i.e., choose) the master.
internal::MasterDetector* detector;
private:
void initialize();
Scheduler* scheduler;
FrameworkInfo framework;
std::string master;
// Used for communicating with the master.
internal::SchedulerProcess* process;
// URL for the master (e.g., zk://, file://, etc).
std::string url;
// Mutex to enforce all non-callbacks are executed serially.
pthread_mutex_t mutex;
// Condition variable for waiting until driver terminates.
pthread_cond_t cond;
// Current status of the driver.
Status status;
const Credential* credential;
// Scheduler process ID.
std::string schedulerId;
};
namespace scheduler {
/**
* Interface to Mesos for a scheduler. Abstracts master detection
* (connection and disconnection) and authentication if some
* credentials are provided.
*
* Expects three callbacks, 'connected', 'disconnected', and
* 'received' which will get invoked _serially_ when it's determined
* that we've connected, disconnected, or received events from the
* master. Note that we drop events while disconnected but it's
* possible to receive a batch of events across a
* disconnected/connected transition before getting the disconnected
* and then connected callback.
* TODO(benh): Don't include events in 'received' that occured after a
* disconnected/connected transition.
**/
class Mesos
{
public:
Mesos(const std::string& master,
#if __cplusplus >= 201103L
const std::function<void(void)>& connected,
const std::function<void(void)>& disconnected,
const std::function<void(const std::queue<Event>&)>& received);
#else // __cplusplus >= 201103L
const std::tr1::function<void(void)>& connected,
const std::tr1::function<void(void)>& disconnected,
const std::tr1::function<void(const std::queue<Event>&)>& received);
#endif // __cplusplus >= 201103L
/**
* Same as the above constructor but takes 'credential' as argument.
*
* The credential will be used for authenticating with the master.
*
**/
Mesos(const std::string& master,
const Credential& credential,
#if __cplusplus >= 201103L
const std::function<void(void)>& connected,
const std::function<void(void)>& disconnected,
const std::function<void(const std::queue<Event>&)>& received);
#else // __cplusplus >= 201103L
const std::tr1::function<void(void)>& connected,
const std::tr1::function<void(void)>& disconnected,
const std::tr1::function<void(const std::queue<Event>&)>& received);
#endif // __cplusplus >= 201103L
virtual ~Mesos();
/**
* Attempts to send a call to the master.
*
* Some local validation of calls is performed which may generate
* events without ever being sent to the master. This includes when
* calls are sent but no master is currently detected (i.e., we're
* disconnected).
*/
virtual void send(const Call& call);
private:
MesosProcess* process;
};
} // namespace scheduler {
} // namespace mesos {
#endif // __MESOS_SCHEDULER_HPP__