| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #ifndef __SLAVE_HPP__ |
| #define __SLAVE_HPP__ |
| |
| #include <list> |
| #include <string> |
| #include <vector> |
| |
| #include <boost/circular_buffer.hpp> |
| |
| #include <mesos/resources.hpp> |
| |
| #include <process/http.hpp> |
| #include <process/future.hpp> |
| #include <process/owned.hpp> |
| #include <process/process.hpp> |
| #include <process/protobuf.hpp> |
| |
| #include <stout/bytes.hpp> |
| #include <stout/linkedhashmap.hpp> |
| #include <stout/hashmap.hpp> |
| #include <stout/hashset.hpp> |
| #include <stout/multihashmap.hpp> |
| #include <stout/option.hpp> |
| #include <stout/os.hpp> |
| #include <stout/path.hpp> |
| #include <stout/uuid.hpp> |
| |
| #include "master/detector.hpp" |
| |
| #include "slave/constants.hpp" |
| #include "slave/containerizer/containerizer.hpp" |
| #include "slave/flags.hpp" |
| #include "slave/gc.hpp" |
| #include "slave/monitor.hpp" |
| #include "slave/paths.hpp" |
| #include "slave/state.hpp" |
| |
| #include "common/attributes.hpp" |
| #include "common/protobuf_utils.hpp" |
| #include "common/type_utils.hpp" |
| |
| #include "files/files.hpp" |
| |
| #include "messages/messages.hpp" |
| |
| namespace mesos { |
| namespace internal { |
| |
| class MasterDetector; // Forward declaration. |
| |
| namespace slave { |
| |
| using namespace process; |
| |
| // Some forward declarations. |
| class StatusUpdateManager; |
| struct Executor; |
| struct Framework; |
| |
| class Slave : public ProtobufProcess<Slave> |
| { |
| public: |
| Slave(const Flags& flags, |
| MasterDetector* detector, |
| Containerizer* containerizer, |
| Files* files); |
| |
| virtual ~Slave(); |
| |
| void shutdown(const process::UPID& from); |
| |
| void registered(const process::UPID& from, const SlaveID& slaveId); |
| void reregistered(const process::UPID& from, const SlaveID& slaveId); |
| void doReliableRegistration(); |
| |
| void runTask( |
| const process::UPID& from, |
| const FrameworkInfo& frameworkInfo, |
| const FrameworkID& frameworkId, |
| const std::string& pid, |
| const TaskInfo& task); |
| |
| void _runTask( |
| const Future<bool>& future, |
| const FrameworkInfo& frameworkInfo, |
| const FrameworkID& frameworkId, |
| const std::string& pid, |
| const TaskInfo& task); |
| |
| Future<bool> unschedule(const std::string& path); |
| |
| void killTask( |
| const process::UPID& from, |
| const FrameworkID& frameworkId, |
| const TaskID& taskId); |
| |
| void shutdownFramework( |
| const process::UPID& from, |
| const FrameworkID& frameworkId); |
| |
| void schedulerMessage( |
| const SlaveID& slaveId, |
| const FrameworkID& frameworkId, |
| const ExecutorID& executorId, |
| const std::string& data); |
| |
| void updateFramework(const FrameworkID& frameworkId, const std::string& pid); |
| |
| void registerExecutor( |
| const process::UPID& from, |
| const FrameworkID& frameworkId, |
| const ExecutorID& executorId); |
| |
| // Called when an executor re-registers with a recovering slave. |
| // 'tasks' : Unacknowledged tasks (i.e., tasks that the executor |
| // driver never received an ACK for.) |
| // 'updates' : Unacknowledged updates. |
| void reregisterExecutor( |
| const process::UPID& from, |
| const FrameworkID& frameworkId, |
| const ExecutorID& executorId, |
| const std::vector<TaskInfo>& tasks, |
| const std::vector<StatusUpdate>& updates); |
| |
| void executorMessage( |
| const SlaveID& slaveId, |
| const FrameworkID& frameworkId, |
| const ExecutorID& executorId, |
| const std::string& data); |
| |
| void ping(const UPID& from, const std::string& body); |
| |
| // Handles the status update. |
| // NOTE: If 'pid' is a valid UPID an ACK is sent to this pid |
| // after the update is successfully handled. If pid == UPID() |
| // no ACK is sent. The latter is used by the slave to send |
| // status updates it generated (e.g., TASK_LOST). |
| void statusUpdate(const StatusUpdate& update, const UPID& pid); |
| |
| // This is called when the status update manager finishes |
| // handling the update. If the handling is successful, an |
| // acknowledgment is sent to the executor. |
| void _statusUpdate( |
| const Future<Nothing>& future, |
| const StatusUpdate& update, |
| const UPID& pid); |
| |
| void statusUpdateAcknowledgement( |
| const SlaveID& slaveId, |
| const FrameworkID& frameworkId, |
| const TaskID& taskId, |
| const std::string& uuid); |
| |
| void _statusUpdateAcknowledgement( |
| const Future<bool>& future, |
| const TaskID& taskId, |
| const FrameworkID& frameworkId, |
| const UUID& uuid); |
| |
| void executorStarted( |
| const FrameworkID& frameworkId, |
| const ExecutorID& executorId, |
| const ContainerID& containerId, |
| const Future<Nothing>& future); |
| |
| void executorTerminated( |
| const FrameworkID& frameworkId, |
| const ExecutorID& executorId, |
| const Future<Containerizer::Termination>& termination); |
| |
| // NOTE: Pulled these to public to make it visible for testing. |
| // TODO(vinod): Make tests friends to this class instead. |
| |
| // Garbage collects the directories based on the current disk usage. |
| // TODO(vinod): Instead of making this function public, we need to |
| // mock both GarbageCollector (and pass it through slave's constructor) |
| // and os calls. |
| void _checkDiskUsage(const Future<Try<double> >& usage); |
| |
| // Shut down an executor. This is a two phase process. First, an |
| // executor receives a shut down message (shut down phase), then |
| // after a configurable timeout the slave actually forces a kill |
| // (kill phase, via the isolator) if the executor has not |
| // exited. |
| void shutdownExecutor(Framework* framework, Executor* executor); |
| |
| // Invoked whenever the detector detects a change in masters. |
| // Made public for testing purposes. |
| void detected(const Future<Option<MasterInfo> >& pid); |
| |
| enum State { |
| RECOVERING, // Slave is doing recovery. |
| DISCONNECTED, // Slave is not connected to the master. |
| RUNNING, // Slave has (re-)registered. |
| TERMINATING, // Slave is shutting down. |
| } state; |
| |
| // TODO(benh): Clang requires members to be public in order to take |
| // their address which we do in tests (for things like |
| // FUTURE_DISPATCH). |
| // protected: |
| virtual void initialize(); |
| virtual void finalize(); |
| virtual void exited(const UPID& pid); |
| |
| void fileAttached(const Future<Nothing>& result, const std::string& path); |
| |
| Nothing detachFile(const std::string& path); |
| |
| // Helper routine to lookup a framework. |
| Framework* getFramework(const FrameworkID& frameworkId); |
| |
| // Returns an ExecutorInfo for a TaskInfo (possibly |
| // constructing one if the task has a CommandInfo). |
| ExecutorInfo getExecutorInfo( |
| const FrameworkID& frameworkId, |
| const TaskInfo& task); |
| |
| // Handle the second phase of shutting down an executor for those |
| // executors that have not properly shutdown within a timeout. |
| void shutdownExecutorTimeout( |
| const FrameworkID& frameworkId, |
| const ExecutorID& executorId, |
| const ContainerID& containerId); |
| |
| // Shuts down the executor if it did not register yet. |
| void registerExecutorTimeout( |
| const FrameworkID& frameworkId, |
| const ExecutorID& executorId, |
| const ContainerID& containerId); |
| |
| // Cleans up all un-reregistered executors during recovery. |
| void reregisterExecutorTimeout(); |
| |
| // This function returns the max age of executor/slave directories allowed, |
| // given a disk usage. This value could be used to tune gc. |
| Duration age(double usage); |
| |
| // Checks the current disk usage and schedules for gc as necessary. |
| void checkDiskUsage(); |
| |
| // Recovers the slave, status update manager and isolator. |
| Future<Nothing> recover(const Result<state::SlaveState>& state); |
| |
| // This is called after 'recover()'. If 'flags.reconnect' is |
| // 'reconnect', the slave attempts to reconnect to any old live |
| // executors. Otherwise, the slave attempts to shutdown/kill them. |
| Future<Nothing> _recover(); |
| |
| // This is a helper to call recover() on the containerizer at the end of |
| // recover() and before __recover(). |
| // TODO(idownes): Remove this when we support defers to objects. |
| Future<Nothing> _recoverContainerizer(const Option<state::SlaveState>& state); |
| |
| // This is called when recovery finishes. |
| void __recover(const Future<Nothing>& future); |
| |
| // Helper to recover a framework from the specified state. |
| void recoverFramework(const state::FrameworkState& state); |
| |
| // Removes and garbage collects the executor. |
| void removeExecutor(Framework* framework, Executor* executor); |
| |
| // Removes and garbage collects the framework. |
| void removeFramework(Framework* framework); |
| |
| // Schedules a 'path' for gc based on its modification time. |
| Future<Nothing> garbageCollect(const std::string& path); |
| |
| private: |
| // Inner class used to namespace HTTP route handlers (see |
| // slave/http.cpp for implementations). |
| class Http |
| { |
| public: |
| Http(const Slave& _slave) : slave(_slave) {} |
| |
| // /slave/health |
| process::Future<process::http::Response> health( |
| const process::http::Request& request); |
| |
| // /slave/stats.json |
| process::Future<process::http::Response> stats( |
| const process::http::Request& request); |
| |
| // /slave/state.json |
| process::Future<process::http::Response> state( |
| const process::http::Request& request); |
| |
| static const std::string HEALTH_HELP; |
| |
| private: |
| const Slave& slave; |
| } http; |
| |
| friend struct Framework; |
| friend struct Executor; |
| |
| Slave(const Slave&); // No copying. |
| Slave& operator = (const Slave&); // No assigning. |
| |
| const Flags flags; |
| |
| SlaveInfo info; |
| |
| Option<UPID> master; |
| |
| Resources resources; |
| Attributes attributes; |
| |
| hashmap<FrameworkID, Framework*> frameworks; |
| |
| boost::circular_buffer<Owned<Framework> > completedFrameworks; |
| |
| MasterDetector* detector; |
| |
| Containerizer* containerizer; |
| |
| Files* files; |
| |
| // Statistics (initialized in Slave::initialize). |
| struct { |
| uint64_t tasks[TaskState_ARRAYSIZE]; |
| uint64_t validStatusUpdates; |
| uint64_t invalidStatusUpdates; |
| uint64_t validFrameworkMessages; |
| uint64_t invalidFrameworkMessages; |
| } stats; |
| |
| Time startTime; |
| |
| GarbageCollector gc; |
| ResourceMonitor monitor; |
| |
| StatusUpdateManager* statusUpdateManager; |
| |
| // Flag to indicate if recovery, including reconciling (i.e., reconnect/kill) |
| // with executors is finished. |
| Promise<Nothing> recovered; |
| |
| // Root meta directory containing checkpointed data. |
| const std::string metaDir; |
| |
| // Indicates the number of errors ignored in "--no-strict" recovery mode. |
| unsigned int recoveryErrors; |
| }; |
| |
| |
| // Information describing an executor. |
| struct Executor |
| { |
| Executor( |
| Slave* slave, |
| const FrameworkID& frameworkId, |
| const ExecutorInfo& info, |
| const ContainerID& containerId, |
| const std::string& directory, |
| bool checkpoint); |
| |
| ~Executor(); |
| |
| Task* addTask(const TaskInfo& task); |
| void terminateTask(const TaskID& taskId, const mesos::TaskState& state); |
| void completeTask(const TaskID& taskId); |
| void checkpointTask(const TaskInfo& task); |
| void recoverTask(const state::TaskState& state); |
| void updateTaskState(const TaskStatus& status); |
| |
| // Returns true if there are any queued/launched/terminated tasks. |
| bool incompleteTasks(); |
| |
| enum State { |
| REGISTERING, // Executor is launched but not (re-)registered yet. |
| RUNNING, // Executor has (re-)registered. |
| TERMINATING, // Executor is being shutdown/killed. |
| TERMINATED, // Executor has terminated but there might be pending updates. |
| } state; |
| |
| // We store the pointer to 'Slave' to get access to its methods |
| // variables. One could imagine 'Executor' as being an inner class |
| // of the 'Slave' class. |
| Slave* slave; |
| |
| const ExecutorID id; |
| const ExecutorInfo info; |
| |
| const FrameworkID frameworkId; |
| |
| const ContainerID containerId; |
| |
| const std::string directory; |
| |
| const bool checkpoint; |
| |
| const bool commandExecutor; |
| |
| UPID pid; |
| |
| Resources resources; // Currently consumed resources. |
| |
| // Tasks can be found in one of the following four data structures: |
| |
| // Not yet launched. |
| LinkedHashMap<TaskID, TaskInfo> queuedTasks; |
| |
| // Running. |
| LinkedHashMap<TaskID, Task*> launchedTasks; |
| |
| // Terminated but pending updates. |
| LinkedHashMap<TaskID, Task*> terminatedTasks; |
| |
| // Terminated and updates acked. |
| // NOTE: We use a shared pointer for Task because clang doesn't like |
| // Boost's implementation of circular_buffer with Task (Boost |
| // attempts to do some memset's which are unsafe). |
| boost::circular_buffer<memory::shared_ptr<Task> > completedTasks; |
| |
| private: |
| Executor(const Executor&); // No copying. |
| Executor& operator = (const Executor&); // No assigning. |
| }; |
| |
| |
| // Information about a framework. |
| struct Framework |
| { |
| Framework( |
| Slave* slave, |
| const FrameworkID& id, |
| const FrameworkInfo& info, |
| const UPID& pid); |
| |
| ~Framework(); |
| |
| Executor* launchExecutor( |
| const ExecutorInfo& executorInfo, |
| const TaskInfo& taskInfo); |
| void destroyExecutor(const ExecutorID& executorId); |
| Executor* getExecutor(const ExecutorID& executorId); |
| Executor* getExecutor(const TaskID& taskId); |
| void recoverExecutor(const state::ExecutorState& state); |
| |
| enum State { |
| RUNNING, // First state of a newly created framework. |
| TERMINATING, // Framework is shutting down in the cluster. |
| } state; |
| |
| // We store the pointer to 'Slave' to get access to its methods |
| // variables. One could imagine 'Framework' as being an inner class |
| // of the 'Slave' class. |
| Slave* slave; |
| |
| const FrameworkID id; |
| const FrameworkInfo info; |
| |
| UPID pid; |
| |
| multihashmap<ExecutorID, TaskID> pending; // Executors with pending tasks. |
| |
| // Current running executors. |
| hashmap<ExecutorID, Executor*> executors; |
| |
| // Up to MAX_COMPLETED_EXECUTORS_PER_FRAMEWORK completed executors. |
| boost::circular_buffer<Owned<Executor> > completedExecutors; |
| private: |
| Framework(const Framework&); // No copying. |
| Framework& operator = (const Framework&); // No assigning. |
| }; |
| |
| |
| std::ostream& operator << (std::ostream& stream, Slave::State state); |
| std::ostream& operator << (std::ostream& stream, Framework::State state); |
| std::ostream& operator << (std::ostream& stream, Executor::State state); |
| |
| } // namespace slave { |
| } // namespace internal { |
| } // namespace mesos { |
| |
| #endif // __SLAVE_HPP__ |