blob: 3fa188709db5e31a2c8bcb5c666de2e55fd889b1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __SLAVE_HPP__
#define __SLAVE_HPP__
#include <stdint.h>
#include <list>
#include <string>
#include <vector>
#include <boost/circular_buffer.hpp>
#include <mesos/resources.hpp>
#include <mesos/type_utils.hpp>
#include <mesos/module/authenticatee.hpp>
#include <process/http.hpp>
#include <process/future.hpp>
#include <process/owned.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
#include <stout/bytes.hpp>
#include <stout/linkedhashmap.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/uuid.hpp>
#include "master/detector.hpp"
#include "slave/constants.hpp"
#include "slave/containerizer/containerizer.hpp"
#include "slave/flags.hpp"
#include "slave/gc.hpp"
#include "slave/metrics.hpp"
#include "slave/monitor.hpp"
#include "slave/paths.hpp"
#include "slave/state.hpp"
#include "common/attributes.hpp"
#include "common/protobuf_utils.hpp"
#include "files/files.hpp"
#include "messages/messages.hpp"
namespace mesos {
namespace internal {
class MasterDetector; // Forward declaration.
namespace slave {
using namespace process;
// Some forward declarations.
class StatusUpdateManager;
struct Executor;
struct Framework;
class Slave : public ProtobufProcess<Slave>
{
public:
Slave(const Flags& flags,
MasterDetector* detector,
Containerizer* containerizer,
Files* files,
GarbageCollector* gc,
StatusUpdateManager* statusUpdateManager);
virtual ~Slave();
void shutdown(const process::UPID& from, const std::string& message);
void registered(const process::UPID& from, const SlaveID& slaveId);
void reregistered(
const process::UPID& from,
const SlaveID& slaveId,
const std::vector<ReconcileTasksMessage>& reconciliations);
void doReliableRegistration(Duration maxBackoff);
// Made 'virtual' for Slave mocking.
virtual void runTask(
const process::UPID& from,
const FrameworkInfo& frameworkInfo,
const FrameworkID& frameworkId,
const std::string& pid,
const TaskInfo& task);
// Made 'virtual' for Slave mocking.
virtual void _runTask(
const process::Future<bool>& future,
const FrameworkInfo& frameworkInfo,
const FrameworkID& frameworkId,
const std::string& pid,
const TaskInfo& task);
process::Future<bool> unschedule(const std::string& path);
// Made 'virtual' for Slave mocking.
virtual void killTask(
const process::UPID& from,
const FrameworkID& frameworkId,
const TaskID& taskId);
void shutdownFramework(
const process::UPID& from,
const FrameworkID& frameworkId);
void schedulerMessage(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const std::string& data);
void updateFramework(const FrameworkID& frameworkId, const std::string& pid);
void checkpointResources(const std::vector<Resource>& checkpointedResources);
void registerExecutor(
const process::UPID& from,
const FrameworkID& frameworkId,
const ExecutorID& executorId);
// Called when an executor re-registers with a recovering slave.
// 'tasks' : Unacknowledged tasks (i.e., tasks that the executor
// driver never received an ACK for.)
// 'updates' : Unacknowledged updates.
void reregisterExecutor(
const process::UPID& from,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const std::vector<TaskInfo>& tasks,
const std::vector<StatusUpdate>& updates);
void _reregisterExecutor(
const process::Future<Nothing>& future,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const ContainerID& containerId);
void executorMessage(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const std::string& data);
// TODO(vinod): Remove this in 0.23.0.
void pingOld(const process::UPID& from, const std::string& body);
// NOTE: This handler is added to make it easy for upgrading slaves
// and masters to 0.22.0. A 0.22.0 master will send PingSlaveMessage
// which will call this method.
void ping(const process::UPID& from, bool connected);
// Handles the status update.
// NOTE: If 'pid' is a valid UPID an ACK is sent to this pid
// after the update is successfully handled. If pid == UPID()
// no ACK is sent. The latter is used by the slave to send
// status updates it generated (e.g., TASK_LOST).
// NOTE: StatusUpdate is passed by value because it is modified
// to ensure source field is set.
void statusUpdate(StatusUpdate update, const process::UPID& pid);
// Continue handling the status update after optionally updating the
// container's resources.
void _statusUpdate(
const Option<Future<Nothing> >& future,
const StatusUpdate& update,
const UPID& pid,
const ExecutorID& executorId,
const ContainerID& containerId,
bool checkpoint);
// This is called when the status update manager finishes
// handling the update. If the handling is successful, an
// acknowledgment is sent to the executor.
void __statusUpdate(
const process::Future<Nothing>& future,
const StatusUpdate& update,
const process::UPID& pid);
// This is called by status update manager to forward a status
// update to the master. Note that the latest state of the task is
// added to the update before forwarding.
void forward(StatusUpdate update);
void statusUpdateAcknowledgement(
const process::UPID& from,
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const TaskID& taskId,
const std::string& uuid);
void _statusUpdateAcknowledgement(
const process::Future<bool>& future,
const TaskID& taskId,
const FrameworkID& frameworkId,
const UUID& uuid);
void executorLaunched(
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const ContainerID& containerId,
const process::Future<bool>& future);
void executorTerminated(
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const process::Future<containerizer::Termination>& termination);
// NOTE: Pulled these to public to make it visible for testing.
// TODO(vinod): Make tests friends to this class instead.
// Garbage collects the directories based on the current disk usage.
// TODO(vinod): Instead of making this function public, we need to
// mock both GarbageCollector (and pass it through slave's constructor)
// and os calls.
void _checkDiskUsage(const process::Future<double>& usage);
// Shut down an executor. This is a two phase process. First, an
// executor receives a shut down message (shut down phase), then
// after a configurable timeout the slave actually forces a kill
// (kill phase, via the isolator) if the executor has not
// exited.
void shutdownExecutor(Framework* framework, Executor* executor);
// Invoked whenever the detector detects a change in masters.
// Made public for testing purposes.
void detected(const process::Future<Option<MasterInfo> >& pid);
enum State {
RECOVERING, // Slave is doing recovery.
DISCONNECTED, // Slave is not connected to the master.
RUNNING, // Slave has (re-)registered.
TERMINATING, // Slave is shutting down.
} state;
// TODO(benh): Clang requires members to be public in order to take
// their address which we do in tests (for things like
// FUTURE_DISPATCH).
// protected:
virtual void initialize();
virtual void finalize();
virtual void exited(const process::UPID& pid);
// This is called when the resource limits of the container have
// been updated for the given tasks. If the update is successful, we
// flush the given tasks to the executor by sending RunTaskMessages.
// TODO(jieyu): Consider renaming it to '__runTasks' once the slave
// starts to support launching multiple tasks in one call (i.e.,
// multi-tasks version of 'runTask').
void runTasks(
const process::Future<Nothing>& future,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const ContainerID& containerId,
const std::list<TaskInfo>& tasks);
void fileAttached(const process::Future<Nothing>& result,
const std::string& path);
Nothing detachFile(const std::string& path);
// Triggers a re-detection of the master when the slave does
// not receive a ping.
void pingTimeout(process::Future<Option<MasterInfo> > future);
void authenticate();
// Helper routine to lookup a framework.
Framework* getFramework(const FrameworkID& frameworkId);
// Returns an ExecutorInfo for a TaskInfo (possibly
// constructing one if the task has a CommandInfo).
ExecutorInfo getExecutorInfo(
const FrameworkID& frameworkId,
const TaskInfo& task);
// Handle the second phase of shutting down an executor for those
// executors that have not properly shutdown within a timeout.
void shutdownExecutorTimeout(
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const ContainerID& containerId);
// Shuts down the executor if it did not register yet.
void registerExecutorTimeout(
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const ContainerID& containerId);
// Cleans up all un-reregistered executors during recovery.
void reregisterExecutorTimeout();
// This function returns the max age of executor/slave directories allowed,
// given a disk usage. This value could be used to tune gc.
Duration age(double usage);
// Checks the current disk usage and schedules for gc as necessary.
void checkDiskUsage();
// Recovers the slave, status update manager and isolator.
process::Future<Nothing> recover(const Result<state::State>& state);
// This is called after 'recover()'. If 'flags.reconnect' is
// 'reconnect', the slave attempts to reconnect to any old live
// executors. Otherwise, the slave attempts to shutdown/kill them.
process::Future<Nothing> _recover();
// This is a helper to call recover() on the containerizer at the end of
// recover() and before __recover().
// TODO(idownes): Remove this when we support defers to objects.
process::Future<Nothing> _recoverContainerizer(
const Option<state::SlaveState>& state);
// This is called when recovery finishes.
void __recover(const process::Future<Nothing>& future);
// Helper to recover a framework from the specified state.
void recoverFramework(const state::FrameworkState& state);
// Removes and garbage collects the executor.
void removeExecutor(Framework* framework, Executor* executor);
// Removes and garbage collects the framework.
// Made 'virtual' for Slave mocking.
virtual void removeFramework(Framework* framework);
// Schedules a 'path' for gc based on its modification time.
Future<Nothing> garbageCollect(const std::string& path);
// Called when the slave was signaled from the specified user.
void signaled(int signal, int uid);
private:
void _authenticate();
void authenticationTimeout(process::Future<bool> future);
// Inner class used to namespace HTTP route handlers (see
// slave/http.cpp for implementations).
class Http
{
public:
explicit Http(Slave* _slave) : slave(_slave) {}
// /slave/health
process::Future<process::http::Response> health(
const process::http::Request& request);
// /slave/stats.json
process::Future<process::http::Response> stats(
const process::http::Request& request);
// /slave/state.json
process::Future<process::http::Response> state(
const process::http::Request& request);
static const std::string HEALTH_HELP;
private:
Slave* slave;
} http;
friend struct Framework;
friend struct Executor;
friend struct Metrics;
Slave(const Slave&); // No copying.
Slave& operator = (const Slave&); // No assigning.
// Gauge methods.
double _frameworks_active()
{
return frameworks.size();
}
double _uptime_secs()
{
return (Clock::now() - startTime).secs();
}
double _registered()
{
return master.isSome() ? 1 : 0;
}
double _tasks_staging();
double _tasks_starting();
double _tasks_running();
double _executors_registering();
double _executors_running();
double _executors_terminating();
void sendExecutorTerminatedStatusUpdate(
const TaskID& taskId,
const Future<containerizer::Termination>& termination,
const FrameworkID& frameworkId,
const Executor* executor);
const Flags flags;
SlaveInfo info;
// Resources that are checkpointed by the slave.
Resources checkpointedResources;
Option<process::UPID> master;
hashmap<FrameworkID, Framework*> frameworks;
boost::circular_buffer<process::Owned<Framework> > completedFrameworks;
MasterDetector* detector;
Containerizer* containerizer;
Files* files;
// Statistics (initialized in Slave::initialize).
struct
{
uint64_t tasks[TaskState_ARRAYSIZE];
uint64_t validStatusUpdates;
uint64_t invalidStatusUpdates;
uint64_t validFrameworkMessages;
uint64_t invalidFrameworkMessages;
} stats;
Metrics metrics;
double _resources_total(const std::string& name);
double _resources_used(const std::string& name);
double _resources_percent(const std::string& name);
process::Time startTime;
GarbageCollector* gc;
ResourceMonitor monitor;
StatusUpdateManager* statusUpdateManager;
// Master detection future.
process::Future<Option<MasterInfo> > detection;
// Timer for triggering re-detection when no ping is received from
// the master.
process::Timer pingTimer;
// Flag to indicate if recovery, including reconciling (i.e., reconnect/kill)
// with executors is finished.
process::Promise<Nothing> recovered;
// Root meta directory containing checkpointed data.
const std::string metaDir;
// Indicates the number of errors ignored in "--no-strict" recovery mode.
unsigned int recoveryErrors;
Option<Credential> credential;
// Authenticatee name as supplied via flags.
std::string authenticateeName;
Authenticatee* authenticatee;
// Indicates if an authentication attempt is in progress.
Option<Future<bool> > authenticating;
// Indicates if the authentication is successful.
bool authenticated;
// Indicates if a new authentication attempt should be enforced.
bool reauthenticate;
};
// Information describing an executor.
struct Executor
{
Executor(
Slave* slave,
const FrameworkID& frameworkId,
const ExecutorInfo& info,
const ContainerID& containerId,
const std::string& directory,
bool checkpoint);
~Executor();
Task* addTask(const TaskInfo& task);
void terminateTask(const TaskID& taskId, const mesos::TaskStatus& status);
void completeTask(const TaskID& taskId);
void checkpointExecutor();
void checkpointTask(const TaskInfo& task);
void recoverTask(const state::TaskState& state);
void updateTaskState(const TaskStatus& status);
// Returns true if there are any queued/launched/terminated tasks.
bool incompleteTasks();
// Returns true if this is a command executor.
bool isCommandExecutor() const;
enum State {
REGISTERING, // Executor is launched but not (re-)registered yet.
RUNNING, // Executor has (re-)registered.
TERMINATING, // Executor is being shutdown/killed.
TERMINATED, // Executor has terminated but there might be pending updates.
} state;
// We store the pointer to 'Slave' to get access to its methods
// variables. One could imagine 'Executor' as being an inner class
// of the 'Slave' class.
Slave* slave;
const ExecutorID id;
const ExecutorInfo info;
const FrameworkID frameworkId;
const ContainerID containerId;
const std::string directory;
const bool checkpoint;
process::UPID pid;
// Currently consumed resources.
Resources resources;
// Tasks can be found in one of the following four data structures:
// Not yet launched.
LinkedHashMap<TaskID, TaskInfo> queuedTasks;
// Running.
LinkedHashMap<TaskID, Task*> launchedTasks;
// Terminated but pending updates.
LinkedHashMap<TaskID, Task*> terminatedTasks;
// Terminated and updates acked.
// NOTE: We use a shared pointer for Task because clang doesn't like
// Boost's implementation of circular_buffer with Task (Boost
// attempts to do some memset's which are unsafe).
boost::circular_buffer<memory::shared_ptr<Task> > completedTasks;
private:
Executor(const Executor&); // No copying.
Executor& operator = (const Executor&); // No assigning.
bool commandExecutor;
};
// Information about a framework.
struct Framework
{
Framework(
Slave* slave,
const FrameworkID& id,
const FrameworkInfo& info,
const process::UPID& pid);
~Framework();
Executor* launchExecutor(
const ExecutorInfo& executorInfo,
const TaskInfo& taskInfo);
void destroyExecutor(const ExecutorID& executorId);
Executor* getExecutor(const ExecutorID& executorId);
Executor* getExecutor(const TaskID& taskId);
void recoverExecutor(const state::ExecutorState& state);
enum State {
RUNNING, // First state of a newly created framework.
TERMINATING, // Framework is shutting down in the cluster.
} state;
// We store the pointer to 'Slave' to get access to its methods
// variables. One could imagine 'Framework' as being an inner class
// of the 'Slave' class.
Slave* slave;
const FrameworkID id;
const FrameworkInfo info;
UPID pid;
// Executors with pending tasks.
hashmap<ExecutorID, hashmap<TaskID, TaskInfo> > pending;
// Current running executors.
hashmap<ExecutorID, Executor*> executors;
// Up to MAX_COMPLETED_EXECUTORS_PER_FRAMEWORK completed executors.
boost::circular_buffer<process::Owned<Executor> > completedExecutors;
private:
Framework(const Framework&); // No copying.
Framework& operator = (const Framework&); // No assigning.
};
std::ostream& operator << (std::ostream& stream, Slave::State state);
std::ostream& operator << (std::ostream& stream, Framework::State state);
std::ostream& operator << (std::ostream& stream, Executor::State state);
} // namespace slave {
} // namespace internal {
} // namespace mesos {
#endif // __SLAVE_HPP__