blob: 3a75d7b1584b99d3aaf1877086506727ae03ee79 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef __SLAVE_HPP__
#define __SLAVE_HPP__
#include <stdint.h>
#include <list>
#include <memory>
#include <string>
#include <vector>
// See comments in `master.hpp` or MESOS-9177.
#define BOOST_CB_DISABLE_DEBUG 1
#define BOOST_CB_ENABLE_DEBUG 0
#include <boost/circular_buffer.hpp>
#include <mesos/attributes.hpp>
#include <mesos/resources.hpp>
#include <mesos/type_utils.hpp>
#include <mesos/agent/agent.hpp>
#include <mesos/authentication/secret_generator.hpp>
#include <mesos/executor/executor.hpp>
#include <mesos/master/detector.hpp>
#include <mesos/module/authenticatee.hpp>
#include <mesos/slave/containerizer.hpp>
#include <mesos/slave/qos_controller.hpp>
#include <mesos/slave/resource_estimator.hpp>
#include <mesos/v1/executor/executor.hpp>
#include <process/http.hpp>
#include <process/future.hpp>
#include <process/owned.hpp>
#include <process/limiter.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
#include <process/shared.hpp>
#include <process/sequence.hpp>
#include <stout/boundedhashmap.hpp>
#include <stout/bytes.hpp>
#include <stout/linkedhashmap.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/recordio.hpp>
#include <stout/uuid.hpp>
#include "common/http.hpp"
#include "common/protobuf_utils.hpp"
#include "common/recordio.hpp"
#include "files/files.hpp"
#include "internal/evolve.hpp"
#include "messages/messages.hpp"
#include "resource_provider/daemon.hpp"
#include "resource_provider/manager.hpp"
#include "slave/constants.hpp"
#include "slave/containerizer/containerizer.hpp"
#include "slave/flags.hpp"
#include "slave/gc.hpp"
#include "slave/http.hpp"
#include "slave/metrics.hpp"
#include "slave/paths.hpp"
#include "slave/state.hpp"
// `REGISTERING` is used as an enum value, but it's actually defined as a
// constant in the Windows SDK.
#ifdef __WINDOWS__
#undef REGISTERING
#undef REGISTERED
#endif // __WINDOWS__
namespace mesos {
// Forward declarations.
class Authorizer;
class DiskProfileAdaptor;
namespace internal {
namespace slave {
// Some forward declarations.
class TaskStatusUpdateManager;
class Executor;
class Framework;
struct HttpConnection;
struct ResourceProvider;
class Slave : public ProtobufProcess<Slave>
{
public:
Slave(const std::string& id,
const Flags& flags,
mesos::master::detector::MasterDetector* detector,
Containerizer* containerizer,
Files* files,
GarbageCollector* gc,
TaskStatusUpdateManager* taskStatusUpdateManager,
mesos::slave::ResourceEstimator* resourceEstimator,
mesos::slave::QoSController* qosController,
mesos::SecretGenerator* secretGenerator,
const Option<Authorizer*>& authorizer);
~Slave() override;
void shutdown(const process::UPID& from, const std::string& message);
void registered(
const process::UPID& from,
const SlaveID& slaveId,
const MasterSlaveConnection& connection);
void reregistered(
const process::UPID& from,
const SlaveID& slaveId,
const std::vector<ReconcileTasksMessage>& reconciliations,
const MasterSlaveConnection& connection);
void doReliableRegistration(Duration maxBackoff);
// TODO(mzhu): Combine this with `runTask()' and replace all `runTask()'
// mock with `run()` mock.
void handleRunTaskMessage(
const process::UPID& from,
RunTaskMessage&& runTaskMessage);
// Made 'virtual' for Slave mocking.
virtual void runTask(
const process::UPID& from,
const FrameworkInfo& frameworkInfo,
const FrameworkID& frameworkId,
const process::UPID& pid,
const TaskInfo& task,
const std::vector<ResourceVersionUUID>& resourceVersionUuids,
const Option<bool>& launchExecutor);
void run(
const FrameworkInfo& frameworkInfo,
ExecutorInfo executorInfo,
Option<TaskInfo> task,
Option<TaskGroupInfo> taskGroup,
const std::vector<ResourceVersionUUID>& resourceVersionUuids,
const process::UPID& pid,
const Option<bool>& launchExecutor,
bool executorGeneratedForCommandTask);
// Made 'virtual' for Slave mocking.
//
// This function returns a future so that we can encapsulate a task(group)
// launch operation (from agent receiving the run message to the completion
// of `_run()`) into a single future. This includes all the asynchronous
// steps (currently two: unschedule GC and task authorization) prior to the
// executor launch.
virtual process::Future<Nothing> _run(
const FrameworkInfo& frameworkInfo,
const ExecutorInfo& executorInfo,
const Option<TaskInfo>& task,
const Option<TaskGroupInfo>& taskGroup,
const std::vector<ResourceVersionUUID>& resourceVersionUuids,
const Option<bool>& launchExecutor);
// Made 'virtual' for Slave mocking.
virtual void __run(
const FrameworkInfo& frameworkInfo,
const ExecutorInfo& executorInfo,
const Option<TaskInfo>& task,
const Option<TaskGroupInfo>& taskGroup,
const std::vector<ResourceVersionUUID>& resourceVersionUuids,
const Option<bool>& launchExecutor,
bool executorGeneratedForCommandTask);
// This is called when the resource limits of the container have
// been updated for the given tasks and task groups. If the update is
// successful, we flush the given tasks to the executor by sending
// RunTaskMessages or `LAUNCH_GROUP` events.
//
// Made 'virtual' for Slave mocking.
virtual void ___run(
const process::Future<Nothing>& future,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const ContainerID& containerId,
const std::vector<TaskInfo>& tasks,
const std::vector<TaskGroupInfo>& taskGroups);
// TODO(mzhu): Combine this with `runTaskGroup()' and replace all
// `runTaskGroup()' mock with `run()` mock.
void handleRunTaskGroupMessage(
const process::UPID& from,
RunTaskGroupMessage&& runTaskGroupMessage);
// Made 'virtual' for Slave mocking.
virtual void runTaskGroup(
const process::UPID& from,
const FrameworkInfo& frameworkInfo,
const ExecutorInfo& executorInfo,
const TaskGroupInfo& taskGroupInfo,
const std::vector<ResourceVersionUUID>& resourceVersionUuids,
const Option<bool>& launchExecutor);
// Made 'virtual' for Slave mocking.
virtual void killTask(
const process::UPID& from,
const KillTaskMessage& killTaskMessage);
// Made 'virtual' for Slave mocking.
virtual void shutdownExecutor(
const process::UPID& from,
const FrameworkID& frameworkId,
const ExecutorID& executorId);
// Shut down an executor. This is a two phase process. First, an
// executor receives a shut down message (shut down phase), then
// after a configurable timeout the slave actually forces a kill
// (kill phase, via the isolator) if the executor has not
// exited.
//
// Made 'virtual' for Slave mocking.
virtual void _shutdownExecutor(Framework* framework, Executor* executor);
void shutdownFramework(
const process::UPID& from,
const FrameworkID& frameworkId);
void schedulerMessage(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const std::string& data);
void updateFramework(
const UpdateFrameworkMessage& message);
void checkpointResources(
std::vector<Resource> checkpointedResources,
bool changeTotal);
void checkpointResourcesMessage(
const std::vector<Resource>& checkpointedResources);
void applyOperation(const ApplyOperationMessage& message);
// Reconciles pending operations with the master. This is necessary to handle
// cases in which operations were dropped in transit, or in which an agent's
// `UpdateSlaveMessage` was sent at the same time as an operation was en route
// from the master to the agent.
void reconcileOperations(const ReconcileOperationsMessage& message);
void subscribe(
HttpConnection http,
const executor::Call::Subscribe& subscribe,
Framework* framework,
Executor* executor);
void registerExecutor(
const process::UPID& from,
const FrameworkID& frameworkId,
const ExecutorID& executorId);
// Called when an executor reregisters with a recovering slave.
// 'tasks' : Unacknowledged tasks (i.e., tasks that the executor
// driver never received an ACK for.)
// 'updates' : Unacknowledged updates.
void reregisterExecutor(
const process::UPID& from,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const std::vector<TaskInfo>& tasks,
const std::vector<StatusUpdate>& updates);
void _reregisterExecutor(
const process::Future<Nothing>& future,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const ContainerID& containerId);
void executorMessage(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const std::string& data);
void ping(const process::UPID& from, bool connected);
// Handles the task status update.
// NOTE: If 'pid' is a valid UPID an ACK is sent to this pid
// after the update is successfully handled. If pid == UPID()
// no ACK is sent. The latter is used by the slave to send
// status updates it generated (e.g., TASK_LOST). If pid == None()
// an ACK is sent to the corresponding HTTP based executor.
// NOTE: StatusUpdate is passed by value because it is modified
// to ensure source field is set.
void statusUpdate(StatusUpdate update, const Option<process::UPID>& pid);
// Called when the slave receives a `StatusUpdate` from an executor
// and the slave needs to retrieve the container status for the
// container associated with the executor.
void _statusUpdate(
StatusUpdate update,
const Option<process::UPID>& pid,
const ExecutorID& executorId,
const Option<process::Future<ContainerStatus>>& containerStatus);
// Continue handling the status update after optionally updating the
// container's resources.
void __statusUpdate(
const Option<process::Future<Nothing>>& future,
const StatusUpdate& update,
const Option<process::UPID>& pid,
const ExecutorID& executorId,
const ContainerID& containerId,
bool checkpoint);
// This is called when the task status update manager finishes
// handling the update. If the handling is successful, an
// acknowledgment is sent to the executor.
void ___statusUpdate(
const process::Future<Nothing>& future,
const StatusUpdate& update,
const Option<process::UPID>& pid);
// This is called by task status update manager to forward a status
// update to the master. Note that the latest state of the task is
// added to the update before forwarding.
void forward(StatusUpdate update);
void statusUpdateAcknowledgement(
const process::UPID& from,
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const TaskID& taskId,
const std::string& uuid);
void _statusUpdateAcknowledgement(
const process::Future<bool>& future,
const TaskID& taskId,
const FrameworkID& frameworkId,
const UUID& uuid);
void operationStatusAcknowledgement(
const process::UPID& from,
const AcknowledgeOperationStatusMessage& acknowledgement);
void executorLaunched(
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const ContainerID& containerId,
const process::Future<Containerizer::LaunchResult>& future);
// Made 'virtual' for Slave mocking.
virtual void executorTerminated(
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const process::Future<Option<
mesos::slave::ContainerTermination>>& termination);
// NOTE: Pulled these to public to make it visible for testing.
// TODO(vinod): Make tests friends to this class instead.
// Garbage collects the directories based on the current disk usage.
// TODO(vinod): Instead of making this function public, we need to
// mock both GarbageCollector (and pass it through slave's constructor)
// and os calls.
void _checkDiskUsage(const process::Future<double>& usage);
// Garbage collect image layers based on the disk usage of image
// store.
void _checkImageDiskUsage(const process::Future<double>& usage);
// Invoked whenever the detector detects a change in masters.
// Made public for testing purposes.
void detected(const process::Future<Option<MasterInfo>>& _master);
enum State
{
RECOVERING, // Slave is doing recovery.
DISCONNECTED, // Slave is not connected to the master.
RUNNING, // Slave has (re-)registered.
TERMINATING, // Slave is shutting down.
} state;
// Describes information about agent recovery.
struct RecoveryInfo
{
// Flag to indicate if recovery, including reconciling
// (i.e., reconnect/kill) with executors is finished.
process::Promise<Nothing> recovered;
// Flag to indicate that HTTP based executors can
// subscribe with the agent. We allow them to subscribe
// after the agent recovers the containerizer.
bool reconnect = false;
} recoveryInfo;
// TODO(benh): Clang requires members to be public in order to take
// their address which we do in tests (for things like
// FUTURE_DISPATCH).
// protected:
void initialize() override;
void finalize() override;
void exited(const process::UPID& pid) override;
// Generates a secret for executor authentication. Returns None if there is
// no secret generator.
process::Future<Option<Secret>> generateSecret(
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const ContainerID& containerId);
// `executorInfo` is a mutated executor info with some default fields and
// resources. If an executor is launched for a task group, `taskInfo` would
// not be set.
void launchExecutor(
const process::Future<Option<Secret>>& authorizationToken,
const FrameworkID& frameworkId,
const ExecutorInfo& executorInfo,
const Option<TaskInfo>& taskInfo);
void fileAttached(const process::Future<Nothing>& result,
const std::string& path,
const std::string& virtualPath);
Nothing detachFile(const std::string& path);
// TODO(qianzhang): This is a workaround to make the default executor task's
// volume directory visible in MESOS UI. It handles two cases:
// 1. The task has disk resources specified. In this case any disk resources
// specified for the task are mounted on the top level container since
// currently all resources of nested containers are merged in the top
// level executor container. To make sure the task can access any volumes
// specified in its disk resources from its sandbox, a workaround was
// introduced to the default executor in MESOS-7225, i.e., adding a
// `SANDBOX_PATH` volume with type `PARENT` to the corresponding nested
// container. This volume gets translated into a bind mount in the nested
// container's mount namespace, which is not visible in Mesos UI because
// it operates in the host namespace. See MESOS-8279 for details.
// 2. The executor has disk resources specified and the task's ContainerInfo
// has a `SANDBOX_PATH` volume with type `PARENT` specified to share the
// executor's disk volume. Similar to the first case, this `SANDBOX_PATH`
// volume gets translated into a bind mount which is not visible in Mesos
// UI. See MESOS-8565 for details.
//
// To make the task's volume directory visible in Mesos UI, here we attach the
// executor's volume directory to it so that it can be accessed via the /files
// endpoint. So when users browse task's volume directory in Mesos UI, what
// they actually browse is the executor's volume directory. Note when calling
// `Files::attach()`, the third argument `authorized` is not specified because
// it is already specified when we do the attach for the executor's sandbox
// and it also applies to the executor's tasks. Note that for the second case
// we can not do the attach when the task's ContainerInfo has a `SANDBOX_PATH`
// volume with type `PARENT` but the executor has NO disk resources, because
// in such case the attach will fail due to the executor's volume directory
// not existing which will actually be created by the `volume/sandbox_path`
// isolator when launching the nested container. But if the executor has disk
// resources, then we will not have this issue since the executor's volume
// directory will be created by the `filesystem/linux` isolator when launching
// the executor before we do the attach.
void attachTaskVolumeDirectory(
const ExecutorInfo& executorInfo,
const ContainerID& executorContainerId,
const Task& task);
// TODO(qianzhang): Remove the task's volume directory from the /files
// endpoint. This is a workaround for MESOS-8279 and MESOS-8565.
void detachTaskVolumeDirectories(
const ExecutorInfo& executorInfo,
const ContainerID& executorContainerId,
const std::vector<Task>& tasks);
// Triggers a re-detection of the master when the slave does
// not receive a ping.
void pingTimeout(process::Future<Option<MasterInfo>> future);
void authenticate(Duration minTimeout, Duration maxTimeout);
// Helper routines to lookup a framework/executor.
Framework* getFramework(const FrameworkID& frameworkId) const;
Executor* getExecutor(
const FrameworkID& frameworkId,
const ExecutorID& executorId) const;
Executor* getExecutor(const ContainerID& containerId) const;
// Returns the ExecutorInfo associated with a TaskInfo. If the task has no
// ExecutorInfo, then we generate an ExecutorInfo corresponding to the
// command executor.
ExecutorInfo getExecutorInfo(
const FrameworkInfo& frameworkInfo,
const TaskInfo& task) const;
// Shuts down the executor if it did not register yet.
void registerExecutorTimeout(
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const ContainerID& containerId);
// Cleans up all un-reregistered executors during recovery.
void reregisterExecutorTimeout();
// This function returns the max age of executor/slave directories allowed,
// given a disk usage. This value could be used to tune gc.
Duration age(double usage);
// Checks the current disk usage and schedules for gc as necessary.
void checkDiskUsage();
// Checks the current container image disk usage and trigger image
// gc if necessary.
void checkImageDiskUsage();
// Recovers the slave, task status update manager and isolator.
process::Future<Nothing> recover(const Try<state::State>& state);
// This is called after 'recover()'. If 'flags.reconnect' is
// 'reconnect', the slave attempts to reconnect to any old live
// executors. Otherwise, the slave attempts to shutdown/kill them.
process::Future<Nothing> _recover();
// This is a helper to call recover() on the containerizer at the end of
// recover() and before __recover().
// TODO(idownes): Remove this when we support defers to objects.
process::Future<Nothing> _recoverContainerizer(
const Option<state::SlaveState>& state);
// This is called when recovery finishes.
// Made 'virtual' for Slave mocking.
virtual void __recover(const process::Future<Nothing>& future);
// Helper to recover a framework from the specified state.
void recoverFramework(
const state::FrameworkState& state,
const hashset<ExecutorID>& executorsToRecheckpoint,
const hashmap<ExecutorID, hashset<TaskID>>& tasksToRecheckpoint);
// Removes and garbage collects the executor.
void removeExecutor(Framework* framework, Executor* executor);
// Removes and garbage collects the framework.
// Made 'virtual' for Slave mocking.
virtual void removeFramework(Framework* framework);
// Schedules a 'path' for gc based on its modification time.
process::Future<Nothing> garbageCollect(const std::string& path);
// Called when the slave was signaled from the specified user.
void signaled(int signal, int uid);
// Made 'virtual' for Slave mocking.
virtual void qosCorrections();
// Made 'virtual' for Slave mocking.
virtual void _qosCorrections(
const process::Future<std::list<
mesos::slave::QoSCorrection>>& correction);
// Returns the resource usage information for all executors.
virtual process::Future<ResourceUsage> usage();
// Handle the second phase of shutting down an executor for those
// executors that have not properly shutdown within a timeout.
void shutdownExecutorTimeout(
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const ContainerID& containerId);
private:
friend class Executor;
friend class Framework;
friend class Http;
friend struct Metrics;
Slave(const Slave&) = delete;
Slave& operator=(const Slave&) = delete;
void _authenticate(Duration currentMinTimeout, Duration currentMaxTimeout);
// Process creation of persistent volumes (for CREATE) and/or deletion
// of persistent volumes (for DESTROY) as a part of handling
// checkpointed resources, and commit the checkpointed resources on
// successful completion of all the operations.
Try<Nothing> syncCheckpointedResources(
const Resources& newCheckpointedResources);
process::Future<bool> authorizeTask(
const TaskInfo& task,
const FrameworkInfo& frameworkInfo);
process::Future<bool> authorizeLogAccess(
const Option<process::http::authentication::Principal>& principal);
process::Future<bool> authorizeSandboxAccess(
const Option<process::http::authentication::Principal>& principal,
const FrameworkID& frameworkId,
const ExecutorID& executorId);
void sendExecutorTerminatedStatusUpdate(
const TaskID& taskId,
const process::Future<Option<
mesos::slave::ContainerTermination>>& termination,
const FrameworkID& frameworkId,
const Executor* executor);
void sendExitedExecutorMessage(
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const Option<int>& status = None());
// Forwards the current total of oversubscribed resources.
void forwardOversubscribed();
void _forwardOversubscribed(
const process::Future<Resources>& oversubscribable);
// Helper functions to generate `UpdateSlaveMessage` for either
// just updates to resource provider-related information, or both
// resource provider-related information and oversubscribed
// resources.
UpdateSlaveMessage generateResourceProviderUpdate() const;
UpdateSlaveMessage generateUpdateSlaveMessage() const;
void handleResourceProviderMessage(
const process::Future<ResourceProviderMessage>& message);
void addOperation(Operation* operation);
// Transitions the operation, and recovers resource if the operation becomes
// terminal.
void updateOperation(
Operation* operation,
const UpdateOperationStatusMessage& update);
// Update the `latest_status` of the operation if it is not terminal.
void updateOperationLatestStatus(
Operation* operation,
const OperationStatus& status);
void removeOperation(Operation* operation);
Operation* getOperation(const UUID& uuid) const;
void addResourceProvider(ResourceProvider* resourceProvider);
ResourceProvider* getResourceProvider(const ResourceProviderID& id) const;
void apply(Operation* operation);
// Prepare all resources to be consumed by the specified container.
process::Future<Nothing> publishResources(
const ContainerID& containerId, const Resources& resources);
// PullGauge methods.
double _frameworks_active()
{
return static_cast<double>(frameworks.size());
}
double _uptime_secs()
{
return (process::Clock::now() - startTime).secs();
}
double _registered()
{
return master.isSome() ? 1 : 0;
}
double _tasks_staging();
double _tasks_starting();
double _tasks_running();
double _tasks_killing();
double _executors_registering();
double _executors_running();
double _executors_terminating();
double _executor_directory_max_allowed_age_secs();
double _resources_total(const std::string& name);
double _resources_used(const std::string& name);
double _resources_percent(const std::string& name);
double _resources_revocable_total(const std::string& name);
double _resources_revocable_used(const std::string& name);
double _resources_revocable_percent(const std::string& name);
// Checks whether the two `SlaveInfo` objects are considered
// compatible based on the value of the `--configuration_policy`
// flag.
Try<Nothing> compatible(
const SlaveInfo& previous,
const SlaveInfo& current) const;
void initializeResourceProviderManager(
const Flags& flags,
const SlaveID& slaveId);
protobuf::master::Capabilities requiredMasterCapabilities;
const Flags flags;
const Http http;
SlaveInfo info;
protobuf::slave::Capabilities capabilities;
// Resources that are checkpointed by the slave.
Resources checkpointedResources;
// The current total resources of the agent, i.e., `info.resources()` with
// checkpointed resources applied and resource provider resources.
Resources totalResources;
Option<process::UPID> master;
hashmap<FrameworkID, Framework*> frameworks;
// Note that these frameworks are "completed" only in that
// they no longer have any active tasks or executors on this
// particular agent.
//
// TODO(bmahler): Implement a more accurate framework lifecycle
// in the agent code, ideally the master can inform the agent
// when a framework is actually completed, and the agent can
// perhaps store a cache of "idle" frameworks. See MESOS-7890.
BoundedHashMap<FrameworkID, process::Owned<Framework>> completedFrameworks;
mesos::master::detector::MasterDetector* detector;
Containerizer* containerizer;
Files* files;
Metrics metrics;
process::Time startTime;
GarbageCollector* gc;
TaskStatusUpdateManager* taskStatusUpdateManager;
// Master detection future.
process::Future<Option<MasterInfo>> detection;
// Master's ping timeout value, updated on reregistration.
Duration masterPingTimeout;
// Timer for triggering re-detection when no ping is received from
// the master.
process::Timer pingTimer;
// Timer for triggering agent (re)registration after detecting a new master.
process::Timer agentRegistrationTimer;
// Root meta directory containing checkpointed data.
const std::string metaDir;
// Indicates the number of errors ignored in "--no-strict" recovery mode.
unsigned int recoveryErrors;
Option<Credential> credential;
// Authenticatee name as supplied via flags.
std::string authenticateeName;
Authenticatee* authenticatee;
// Indicates if an authentication attempt is in progress.
Option<process::Future<bool>> authenticating;
// Indicates if the authentication is successful.
bool authenticated;
// Indicates if a new authentication attempt should be enforced.
bool reauthenticate;
// Maximum age of executor directories. Will be recomputed
// periodically every flags.disk_watch_interval.
Duration executorDirectoryMaxAllowedAge;
mesos::slave::ResourceEstimator* resourceEstimator;
mesos::slave::QoSController* qosController;
std::shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
mesos::SecretGenerator* secretGenerator;
const Option<Authorizer*> authorizer;
// The most recent estimate of the total amount of oversubscribed
// (allocated and oversubscribable) resources.
Option<Resources> oversubscribedResources;
process::Owned<ResourceProviderManager> resourceProviderManager;
process::Owned<LocalResourceProviderDaemon> localResourceProviderDaemon;
// Local resource providers known by the agent.
hashmap<ResourceProviderID, ResourceProvider*> resourceProviders;
// Used to establish the relationship between the operation and the
// resources that the operation is operating on. Each resource
// provider will keep a resource version UUID, and change it when it
// believes that the resources from this resource provider are out
// of sync from the master's view. The master will keep track of
// the last known resource version UUID for each resource provider,
// and attach the resource version UUID in each operation it sends
// out. The resource provider should reject operations that have a
// different resource version UUID than that it maintains, because
// this means the operation is operating on resources that might
// have already been invalidated.
UUID resourceVersion;
// Keeps track of the following:
// (1) Pending operations for resources from the agent.
// (2) Pending operations or terminal operations that have
// unacknowledged status updates for resource provider
// provided resources.
hashmap<UUID, Operation*> operations;
};
// Represents the streaming HTTP connection to an executor.
struct HttpConnection
{
HttpConnection(const process::http::Pipe::Writer& _writer,
ContentType _contentType)
: writer(_writer),
contentType(_contentType),
encoder(lambda::bind(serialize, contentType, lambda::_1)) {}
// Converts the message to an Event before sending.
template <typename Message>
bool send(const Message& message)
{
// We need to evolve the internal 'message' into a
// 'v1::executor::Event'.
return writer.write(encoder.encode(evolve(message)));
}
bool close()
{
return writer.close();
}
process::Future<Nothing> closed() const
{
return writer.readerClosed();
}
process::http::Pipe::Writer writer;
ContentType contentType;
::recordio::Encoder<v1::executor::Event> encoder;
};
std::ostream& operator<<(std::ostream& stream, const Executor& executor);
// Information describing an executor.
class Executor
{
public:
Executor(
Slave* slave,
const FrameworkID& frameworkId,
const ExecutorInfo& info,
const ContainerID& containerId,
const std::string& directory,
const Option<std::string>& user,
bool checkpoint,
bool isGeneratedForCommandTask);
~Executor();
// Note that these tasks will also be tracked within `queuedTasks`.
void enqueueTaskGroup(const TaskGroupInfo& taskGroup);
void enqueueTask(const TaskInfo& task);
Option<TaskInfo> dequeueTask(const TaskID& taskId);
Task* addLaunchedTask(const TaskInfo& task);
void completeTask(const TaskID& taskId);
void checkpointExecutor();
void checkpointTask(const TaskInfo& task);
void checkpointTask(const Task& task);
void recoverTask(const state::TaskState& state, bool recheckpointTask);
void addPendingTaskStatus(const TaskStatus& status);
void removePendingTaskStatus(const TaskStatus& status);
Try<Nothing> updateTaskState(const TaskStatus& status);
// Returns true if there are any queued/launched/terminated tasks.
bool incompleteTasks();
// Returns true if the agent ever sent any tasks to this executor.
// More precisely, this function returns whether either:
//
// (1) There are terminated/completed tasks with a
// SOURCE_EXECUTOR status.
//
// (2) `launchedTasks` is not empty.
//
// If this function returns false and there are no queued tasks,
// we probably (see TODO below) have killed or dropped all of its
// initial tasks, in which case the agent will shut down the executor.
//
// TODO(mzhu): Note however, that since we look through the cache
// of completed tasks, we can have false positives when a task
// that was delivered to the executor has been evicted from the
// completed task cache by tasks that have been killed by the
// agent before delivery. This should be fixed.
//
// NOTE: This function checks whether any tasks has ever been sent,
// this does not necessarily mean the executor has ever received
// any tasks. Specifically, tasks in `launchedTasks` may be dropped
// before received by the executor if the agent restarts.
bool everSentTask() const;
// Sends a message to the connected executor.
template <typename Message>
void send(const Message& message)
{
if (state == REGISTERING || state == TERMINATED) {
LOG(WARNING) << "Attempting to send message to disconnected"
<< " executor " << *this << " in state " << state;
}
if (http.isSome()) {
if (!http->send(message)) {
LOG(WARNING) << "Unable to send event to executor " << *this
<< ": connection closed";
}
} else if (pid.isSome()) {
slave->send(pid.get(), message);
} else {
LOG(WARNING) << "Unable to send event to executor " << *this
<< ": unknown connection type";
}
}
// Returns true if this executor is generated by Mesos for a command
// task (either command executor for MesosContainerizer or docker
// executor for DockerContainerizer).
bool isGeneratedForCommandTask() const;
// Closes the HTTP connection.
void closeHttpConnection();
// Returns the task group associated with the task.
Option<TaskGroupInfo> getQueuedTaskGroup(const TaskID& taskId);
Resources allocatedResources() const;
enum State
{
REGISTERING, // Executor is launched but not (re-)registered yet.
RUNNING, // Executor has (re-)registered.
TERMINATING, // Executor is being shutdown/killed.
TERMINATED, // Executor has terminated but there might be pending updates.
} state;
// We store the pointer to 'Slave' to get access to its methods
// variables. One could imagine 'Executor' as being an inner class
// of the 'Slave' class.
Slave* slave;
const ExecutorID id;
const ExecutorInfo info;
const FrameworkID frameworkId;
const ContainerID containerId;
const std::string directory;
// The sandbox will be owned by this user and the executor will
// run as this user. This can be set to None when --switch_user
// is false or when compiled for Windows.
const Option<std::string> user;
const bool checkpoint;
// An Executor can either be connected via HTTP or by libprocess
// message passing. The following are the possible states:
//
// Agent State Executor State http pid Executor Type
// ----------- -------------- ---- ---- -------------
// RECOVERING REGISTERING None UPID() Unknown
// REGISTERING None Some Libprocess
// REGISTERING None None HTTP
//
// * REGISTERING None None Not known yet
// * * None Some Libprocess
// * * Some None HTTP
Option<HttpConnection> http;
Option<process::UPID> pid;
// Tasks can be found in one of the following four data structures:
//
// TODO(bmahler): Make these private to enforce that the task
// lifecycle helper functions are not being bypassed, and provide
// public views into them.
// Not yet launched tasks. This also includes tasks from `queuedTaskGroups`.
LinkedHashMap<TaskID, TaskInfo> queuedTasks;
// Not yet launched task groups. This is needed for correctly sending
// TASK_KILLED status updates for all tasks in the group if any of the
// tasks were killed before the executor could register with the agent.
std::vector<TaskGroupInfo> queuedTaskGroups;
// Running.
LinkedHashMap<TaskID, Task*> launchedTasks;
// Terminated but pending updates.
LinkedHashMap<TaskID, Task*> terminatedTasks;
// Terminated and updates acked.
// NOTE: We use a shared pointer for Task because clang doesn't like
// Boost's implementation of circular_buffer with Task (Boost
// attempts to do some memset's which are unsafe).
boost::circular_buffer<std::shared_ptr<Task>> completedTasks;
// When the slave initiates a destroy of the container, we expect a
// termination to occur. The 'pendingTermation' indicates why the
// slave initiated the destruction and will influence the
// information sent in the status updates for any remaining
// non-terminal tasks.
Option<mesos::slave::ContainerTermination> pendingTermination;
// Task status updates that are being processed by the agent.
hashmap<TaskID, LinkedHashMap<id::UUID, TaskStatus>> pendingStatusUpdates;
private:
Executor(const Executor&) = delete;
Executor& operator=(const Executor&) = delete;
bool isGeneratedForCommandTask_;
};
// Information about a framework.
class Framework
{
public:
Framework(
Slave* slave,
const Flags& slaveFlags,
const FrameworkInfo& info,
const Option<process::UPID>& pid);
~Framework();
// Returns whether the framework is idle, where idle is
// defined as having no activity:
// (1) The framework has no non-terminal tasks and executors.
// (2) All status updates have been acknowledged.
//
// TODO(bmahler): The framework should also not be considered
// idle if there are unacknowledged updates for "pending" tasks.
bool idle() const;
void checkpointFramework() const;
const FrameworkID id() const { return info.id(); }
Try<Executor*> addExecutor(
const ExecutorInfo& executorInfo,
bool isGeneratedForCommandTask);
Executor* getExecutor(const ExecutorID& executorId) const;
Executor* getExecutor(const TaskID& taskId) const;
void destroyExecutor(const ExecutorID& executorId);
void recoverExecutor(
const state::ExecutorState& state,
bool recheckpointExecutor,
const hashset<TaskID>& tasksToRecheckpoint);
void addPendingTask(
const ExecutorID& executorId,
const TaskInfo& task);
// Note that these tasks will also be tracked within `pendingTasks`.
void addPendingTaskGroup(
const ExecutorID& executorId,
const TaskGroupInfo& taskGroup);
bool hasTask(const TaskID& taskId) const;
bool isPending(const TaskID& taskId) const;
// Returns the task group associated with a pending task.
Option<TaskGroupInfo> getTaskGroupForPendingTask(const TaskID& taskId);
// Returns whether the pending task was removed.
bool removePendingTask(const TaskID& taskId);
Option<ExecutorID> getExecutorIdForPendingTask(const TaskID& taskId) const;
Resources allocatedResources() const;
enum State
{
RUNNING, // First state of a newly created framework.
TERMINATING, // Framework is shutting down in the cluster.
} state;
// We store the pointer to 'Slave' to get access to its methods and
// variables. One could imagine 'Framework' being an inner class of
// the 'Slave' class.
Slave* slave;
FrameworkInfo info;
protobuf::framework::Capabilities capabilities;
// Frameworks using the scheduler driver will have a 'pid',
// which allows us to send executor messages directly to the
// driver. Frameworks using the HTTP API (in 0.24.0) will
// not have a 'pid', in which case executor messages are
// sent through the master.
Option<process::UPID> pid;
// Executors can be found in one of the following
// data structures:
//
// TODO(bmahler): Make these private to enforce that
// the executors lifecycle helper functions are not
// being bypassed, and provide public views into them.
// Executors with pending tasks.
hashmap<ExecutorID, hashmap<TaskID, TaskInfo>> pendingTasks;
// Sequences in this map are used to enforce the order of tasks launched on
// each executor.
//
// Note on the lifecycle of the sequence: if the corresponding executor struct
// has not been created, we tie the lifecycle of the sequence to the first
// task in the sequence (which must have the `launch_executor` flag set to
// true modulo MESOS-3870). If the task fails to launch before creating the
// executor struct, we will delete the sequence. Once the executor struct is
// created, we tie the lifecycle of the sequence to the executor struct.
//
// TODO(mzhu): Create the executor struct early and put the sequence in it.
hashmap<ExecutorID, process::Sequence> taskLaunchSequences;
// Pending task groups. This is needed for correctly sending
// TASK_KILLED status updates for all tasks in the group if
// any of the tasks are killed while pending.
std::vector<TaskGroupInfo> pendingTaskGroups;
// Current running executors.
hashmap<ExecutorID, Executor*> executors;
boost::circular_buffer<process::Owned<Executor>> completedExecutors;
private:
Framework(const Framework&) = delete;
Framework& operator=(const Framework&) = delete;
};
struct ResourceProvider
{
ResourceProvider(
const ResourceProviderInfo& _info,
const Resources& _totalResources,
const UUID& _resourceVersion)
: info(_info),
totalResources(_totalResources),
resourceVersion(_resourceVersion) {}
void addOperation(Operation* operation);
void removeOperation(Operation* operation);
ResourceProviderInfo info;
Resources totalResources;
// Used to establish the relationship between the operation and the
// resources that the operation is operating on. Each resource
// provider will keep a resource version UUID, and change it when it
// believes that the resources from this resource provider are out
// of sync from the master's view. The master will keep track of
// the last known resource version UUID for each resource provider,
// and attach the resource version UUID in each operation it sends
// out. The resource provider should reject operations that have a
// different resource version UUID than that it maintains, because
// this means the operation is operating on resources that might
// have already been invalidated.
UUID resourceVersion;
// Pending operations or terminal operations that have
// unacknowledged status updates.
hashmap<UUID, Operation*> operations;
};
/**
* Returns a map of environment variables necessary in order to launch
* an executor.
*
* @param executorInfo ExecutorInfo being launched.
* @param directory Path to the sandbox directory.
* @param slaveId SlaveID where this executor is being launched.
* @param slavePid PID of the slave launching the executor.
* @param checkpoint Whether or not the framework is checkpointing.
* @param flags Flags used to launch the slave.
*
* @return Map of environment variables (name, value).
*/
std::map<std::string, std::string> executorEnvironment(
const Flags& flags,
const ExecutorInfo& executorInfo,
const std::string& directory,
const SlaveID& slaveId,
const process::PID<Slave>& slavePid,
const Option<Secret>& authenticationToken,
bool checkpoint);
std::ostream& operator<<(std::ostream& stream, Executor::State state);
std::ostream& operator<<(std::ostream& stream, Framework::State state);
std::ostream& operator<<(std::ostream& stream, Slave::State state);
} // namespace slave {
} // namespace internal {
} // namespace mesos {
#endif // __SLAVE_HPP__