blob: 423235a90166e71866a88baa3bf16ba551010c5c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef __MESOS_SCHEDULER_HPP__
#define __MESOS_SCHEDULER_HPP__
#include <memory>
#include <mutex>
#include <string>
#include <vector>
#include <mesos/mesos.hpp>
// Mesos scheduler interface and scheduler driver. A scheduler is used
// to interact with Mesos in order to run distributed computations.
//
// IF YOU FIND YOURSELF MODIFYING COMMENTS HERE PLEASE CONSIDER MAKING
// THE SAME MODIFICATIONS FOR OTHER LANGUAGE BINDINGS (e.g., Java:
// src/java/src/org/apache/mesos, Python: src/python/src, etc.).
// Forward declaration.
namespace process {
class Latch;
} // namespace process {
namespace mesos {
// A few forward declarations.
class SchedulerDriver;
namespace scheduler {
class MesosProcess;
} // namespace scheduler {
namespace internal {
class SchedulerProcess;
} // namespace internal {
namespace master {
namespace detector {
class MasterDetector;
} // namespace detector {
} // namespace master {
// Callback interface to be implemented by frameworks' schedulers.
// Note that only one callback will be invoked at a time, so it is not
// recommended that you block within a callback because it may cause a
// deadlock.
//
// Each callback includes a pointer to the scheduler driver that was
// used to run this scheduler. The pointer will not change for the
// duration of a scheduler (i.e., from the point you do
// SchedulerDriver::start() to the point that SchedulerDriver::join()
// returns). This is intended for convenience so that a scheduler
// doesn't need to store a pointer to the driver itself.
class Scheduler
{
public:
// Empty virtual destructor (necessary to instantiate subclasses).
virtual ~Scheduler() {}
// Invoked when the scheduler successfully registers with a Mesos
// master. A unique ID (generated by the master) used for
// distinguishing this framework from others and MasterInfo with the
// ip and port of the current master are provided as arguments.
virtual void registered(
SchedulerDriver* driver,
const FrameworkID& frameworkId,
const MasterInfo& masterInfo) = 0;
// Invoked when the scheduler re-registers with a newly elected
// Mesos master. This is only called when the scheduler has
// previously been registered. MasterInfo containing the updated
// information about the elected master is provided as an argument.
virtual void reregistered(
SchedulerDriver* driver,
const MasterInfo& masterInfo) = 0;
// Invoked when the scheduler becomes "disconnected" from the master
// (e.g., the master fails and another is taking over).
virtual void disconnected(SchedulerDriver* driver) = 0;
// Invoked when resources have been offered to this framework. A
// single offer will only contain resources from a single slave.
// Resources associated with an offer will not be re-offered to
// _this_ framework until either (a) this framework has rejected
// those resources (see SchedulerDriver::launchTasks) or (b) those
// resources have been rescinded (see Scheduler::offerRescinded).
// Note that resources may be concurrently offered to more than one
// framework at a time (depending on the allocator being used). In
// that case, the first framework to launch tasks using those
// resources will be able to use them while the other frameworks
// will have those resources rescinded (or if a framework has
// already launched tasks with those resources then those tasks will
// fail with a TASK_LOST status and a message saying as much).
virtual void resourceOffers(
SchedulerDriver* driver,
const std::vector<Offer>& offers) = 0;
// Invoked when an offer is no longer valid (e.g., the slave was
// lost or another framework used resources in the offer). If for
// whatever reason an offer is never rescinded (e.g., dropped
// message, failing over framework, etc.), a framework that attempts
// to launch tasks using an invalid offer will receive TASK_LOST
// status updates for those tasks (see Scheduler::resourceOffers).
virtual void offerRescinded(
SchedulerDriver* driver,
const OfferID& offerId) = 0;
// Invoked when the status of a task has changed (e.g., a slave is
// lost and so the task is lost, a task finishes and an executor
// sends a status update saying so, etc). If implicit
// acknowledgements are being used, then returning from this
// callback _acknowledges_ receipt of this status update! If for
// whatever reason the scheduler aborts during this callback (or
// the process exits) another status update will be delivered (note,
// however, that this is currently not true if the slave sending the
// status update is lost/fails during that time). If explicit
// acknowledgements are in use, the scheduler must acknowledge this
// status on the driver.
virtual void statusUpdate(
SchedulerDriver* driver,
const TaskStatus& status) = 0;
// Invoked when an executor sends a message. These messages are best
// effort; do not expect a framework message to be retransmitted in
// any reliable fashion.
virtual void frameworkMessage(
SchedulerDriver* driver,
const ExecutorID& executorId,
const SlaveID& slaveId,
const std::string& data) = 0;
// Invoked when a slave has been determined unreachable (e.g.,
// machine failure, network partition). Most frameworks will need to
// reschedule any tasks launched on this slave on a new slave.
//
// NOTE: This callback is not reliably delivered. If a host or
// network failure causes messages between the master and the
// scheduler to be dropped, this callback may not be invoked.
virtual void slaveLost(
SchedulerDriver* driver,
const SlaveID& slaveId) = 0;
// Invoked when an executor has exited/terminated. Note that any
// tasks running will have TASK_LOST status updates automagically
// generated.
//
// NOTE: This callback is not reliably delivered. If a host or
// network failure causes messages between the master and the
// scheduler to be dropped, this callback may not be invoked.
virtual void executorLost(
SchedulerDriver* driver,
const ExecutorID& executorId,
const SlaveID& slaveId,
int status) = 0;
// Invoked when there is an unrecoverable error in the scheduler or
// scheduler driver. The driver will be aborted BEFORE invoking this
// callback.
virtual void error(
SchedulerDriver* driver,
const std::string& message) = 0;
};
// Abstract interface for connecting a scheduler to Mesos. This
// interface is used both to manage the scheduler's lifecycle (start
// it, stop it, or wait for it to finish) and to interact with Mesos
// (e.g., launch tasks, kill tasks, etc.). See MesosSchedulerDriver
// below for a concrete example of a SchedulerDriver.
class SchedulerDriver
{
public:
// Empty virtual destructor (necessary to instantiate subclasses).
// It is expected that 'stop()' is called before this is called.
virtual ~SchedulerDriver() {}
// Starts the scheduler driver. This needs to be called before any
// other driver calls are made.
virtual Status start() = 0;
// Stops the scheduler driver. If the 'failover' flag is set to
// false then it is expected that this framework will never
// reconnect to Mesos. So Mesos will unregister the framework and
// shutdown all its tasks and executors. If 'failover' is true, all
// executors and tasks will remain running (for some framework
// specific failover timeout) allowing the scheduler to reconnect
// (possibly in the same process, or from a different process, for
// example, on a different machine).
virtual Status stop(bool failover = false) = 0;
// Aborts the driver so that no more callbacks can be made to the
// scheduler. The semantics of abort and stop have deliberately been
// separated so that code can detect an aborted driver (i.e., via
// the return status of SchedulerDriver::join, see below), and
// instantiate and start another driver if desired (from within the
// same process). Note that 'stop()' is not automatically called
// inside 'abort()'.
virtual Status abort() = 0;
// Waits for the driver to be stopped or aborted, possibly
// _blocking_ the current thread indefinitely. The return status of
// this function can be used to determine if the driver was aborted
// (see mesos.proto for a description of Status).
virtual Status join() = 0;
// Starts and immediately joins (i.e., blocks on) the driver.
virtual Status run() = 0;
// Requests resources from Mesos (see mesos.proto for a description
// of Request and how, for example, to request resources from
// specific slaves). Any resources available are offered to the
// framework via Scheduler::resourceOffers callback, asynchronously.
virtual Status requestResources(const std::vector<Request>& requests) = 0;
// Launches the given set of tasks. Any remaining resources (i.e.,
// those that are not used by the launched tasks or their executors)
// will be considered declined. Note that this includes resources
// used by tasks that the framework attempted to launch but failed
// (with TASK_ERROR) due to a malformed task description. The
// specified filters are applied on all unused resources (see
// mesos.proto for a description of Filters). Available resources
// are aggregated when multiple offers are provided. Note that all
// offers must belong to the same slave. Invoking this function with
// an empty collection of tasks declines offers in their entirety
// (see Scheduler::declineOffer).
virtual Status launchTasks(
const std::vector<OfferID>& offerIds,
const std::vector<TaskInfo>& tasks,
const Filters& filters = Filters()) = 0;
// DEPRECATED: Use launchTasks(offerIds, tasks, filters) instead.
virtual Status launchTasks(
const OfferID& offerId,
const std::vector<TaskInfo>& tasks,
const Filters& filters = Filters()) = 0;
// Kills the specified task. Note that attempting to kill a task is
// currently not reliable. If, for example, a scheduler fails over
// while it was attempting to kill a task it will need to retry in
// the future. Likewise, if unregistered / disconnected, the request
// will be dropped (these semantics may be changed in the future).
virtual Status killTask(const TaskID& taskId) = 0;
// Accepts the given offers and performs a sequence of operations on
// those accepted offers. See Offer.Operation in mesos.proto for the
// set of available operations. Any remaining resources (i.e., those
// that are not used by the launched tasks or their executors) will
// be considered declined. Note that this includes resources used by
// tasks that the framework attempted to launch but failed (with
// TASK_ERROR) due to a malformed task description. The specified
// filters are applied on all unused resources (see mesos.proto for
// a description of Filters). Available resources are aggregated
// when multiple offers are provided. Note that all offers must
// belong to the same slave.
virtual Status acceptOffers(
const std::vector<OfferID>& offerIds,
const std::vector<Offer::Operation>& operations,
const Filters& filters = Filters()) = 0;
// Declines an offer in its entirety and applies the specified
// filters on the resources (see mesos.proto for a description of
// Filters). Note that this can be done at any time, it is not
// necessary to do this within the Scheduler::resourceOffers
// callback.
virtual Status declineOffer(
const OfferID& offerId,
const Filters& filters = Filters()) = 0;
// Removes all filters previously set by the framework (via
// launchTasks()). This enables the framework to receive offers from
// those filtered slaves.
virtual Status reviveOffers() = 0;
// Inform Mesos master to stop sending offers to the framework. The
// scheduler should call reviveOffers() to resume getting offers.
virtual Status suppressOffers() = 0;
// Acknowledges the status update. This should only be called
// once the status update is processed durably by the scheduler.
// Not that explicit acknowledgements must be requested via the
// constructor argument, otherwise a call to this method will
// cause the driver to crash.
virtual Status acknowledgeStatusUpdate(
const TaskStatus& status) = 0;
// Sends a message from the framework to one of its executors. These
// messages are best effort; do not expect a framework message to be
// retransmitted in any reliable fashion.
virtual Status sendFrameworkMessage(
const ExecutorID& executorId,
const SlaveID& slaveId,
const std::string& data) = 0;
// Allows the framework to query the status for non-terminal tasks.
// This causes the master to send back the latest task status for
// each task in 'statuses', if possible. Tasks that are no longer
// known will result in a TASK_LOST update. If statuses is empty,
// then the master will send the latest status for each task
// currently known.
virtual Status reconcileTasks(
const std::vector<TaskStatus>& statuses) = 0;
};
// Concrete implementation of a SchedulerDriver that connects a
// Scheduler with a Mesos master. The MesosSchedulerDriver is
// thread-safe.
//
// Note that scheduler failover is supported in Mesos. After a
// scheduler is registered with Mesos it may failover (to a new
// process on the same machine or across multiple machines) by
// creating a new driver with the ID given to it in
// Scheduler::registered.
//
// The driver is responsible for invoking the Scheduler callbacks as
// it communicates with the Mesos master.
//
// Note that blocking on the MesosSchedulerDriver (e.g., via
// MesosSchedulerDriver::join) doesn't affect the scheduler callbacks
// in anyway because they are handled by a different thread.
//
// Note that the driver uses GLOG to do its own logging. GLOG flags
// can be set via environment variables, prefixing the flag name with
// "GLOG_", e.g., "GLOG_v=1". For Mesos specific logging flags see
// src/logging/flags.hpp. Mesos flags can also be set via environment
// variables, prefixing the flag name with "MESOS_", e.g.,
// "MESOS_QUIET=1".
//
// See src/examples/test_framework.cpp for an example of using the
// MesosSchedulerDriver.
class MesosSchedulerDriver : public SchedulerDriver
{
public:
// Creates a new driver for the specified scheduler. The master
// should be one of:
//
// host:port
// zk://host1:port1,host2:port2,.../path
// zk://username:password@host1:port1,host2:port2,.../path
// file:///path/to/file (where file contains one of the above)
//
// The driver will attempt to "failover" if the specified
// FrameworkInfo includes a valid FrameworkID.
//
// Any Mesos configuration options are read from environment
// variables, as well as any configuration files found through the
// environment variables.
//
// TODO(vinod): Deprecate this once 'MesosSchedulerDriver' can take
// 'Option<Credential>' as parameter. Currently it cannot because
// 'stout' is not visible from here.
MesosSchedulerDriver(
Scheduler* scheduler,
const FrameworkInfo& framework,
const std::string& master);
// Same as the above constructor but takes 'credential' as argument.
// The credential will be used for authenticating with the master.
MesosSchedulerDriver(
Scheduler* scheduler,
const FrameworkInfo& framework,
const std::string& master,
const Credential& credential);
// These constructors are the same as the above two, but allow
// the framework to specify whether implicit or explicit
// acknowledgements are desired. See statusUpdate() for the
// details about explicit acknowledgements.
//
// TODO(bmahler): Deprecate the above two constructors. In 0.22.0
// these new constructors are exposed.
MesosSchedulerDriver(
Scheduler* scheduler,
const FrameworkInfo& framework,
const std::string& master,
bool implicitAcknowledgements);
MesosSchedulerDriver(
Scheduler* scheduler,
const FrameworkInfo& framework,
const std::string& master,
bool implicitAcknowlegements,
const Credential& credential);
// This destructor will block indefinitely if
// MesosSchedulerDriver::start was invoked successfully (possibly
// via MesosSchedulerDriver::run) and MesosSchedulerDriver::stop has
// not been invoked.
virtual ~MesosSchedulerDriver();
// See SchedulerDriver for descriptions of these.
virtual Status start();
virtual Status stop(bool failover = false);
virtual Status abort();
virtual Status join();
virtual Status run();
virtual Status requestResources(
const std::vector<Request>& requests);
// TODO(nnielsen): launchTasks using single offer is deprecated.
// Use launchTasks with offer list instead.
virtual Status launchTasks(
const OfferID& offerId,
const std::vector<TaskInfo>& tasks,
const Filters& filters = Filters());
virtual Status launchTasks(
const std::vector<OfferID>& offerIds,
const std::vector<TaskInfo>& tasks,
const Filters& filters = Filters());
virtual Status killTask(const TaskID& taskId);
virtual Status acceptOffers(
const std::vector<OfferID>& offerIds,
const std::vector<Offer::Operation>& operations,
const Filters& filters = Filters());
virtual Status declineOffer(
const OfferID& offerId,
const Filters& filters = Filters());
virtual Status reviveOffers();
virtual Status suppressOffers();
virtual Status acknowledgeStatusUpdate(
const TaskStatus& status);
virtual Status sendFrameworkMessage(
const ExecutorID& executorId,
const SlaveID& slaveId,
const std::string& data);
virtual Status reconcileTasks(
const std::vector<TaskStatus>& statuses);
protected:
// Used to detect (i.e., choose) the master.
std::shared_ptr<master::detector::MasterDetector> detector;
private:
void initialize();
Scheduler* scheduler;
FrameworkInfo framework;
std::string master;
// Used for communicating with the master.
internal::SchedulerProcess* process;
// URL for the master (e.g., zk://, file://, etc).
std::string url;
// Mutex for enforcing serial execution of all non-callbacks.
std::recursive_mutex mutex;
// Latch for waiting until driver terminates.
process::Latch* latch;
// Current status of the driver.
Status status;
const bool implicitAcknowlegements;
const Credential* credential;
// Scheduler process ID.
std::string schedulerId;
};
} // namespace mesos {
#endif // __MESOS_SCHEDULER_HPP__