blob: 1f4c8368feb0ce19963577582ce745acfb21aa9f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <errno.h>
#include <signal.h>
#include <stdlib.h> // For random().
#include <algorithm>
#include <iomanip>
#include <list>
#include <map>
#include <memory>
#include <set>
#include <sstream>
#include <string>
#include <vector>
#include <mesos/type_utils.hpp>
#include <mesos/module/authenticatee.hpp>
#include <process/async.hpp>
#include <process/check.hpp>
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
#include <process/http.hpp>
#include <process/id.hpp>
#include <process/time.hpp>
#include <stout/bytes.hpp>
#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/exit.hpp>
#include <stout/fs.hpp>
#include <stout/lambda.hpp>
#include <stout/net.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#ifdef __linux__
#include <stout/proc.hpp>
#endif // __linux__
#include <stout/numify.hpp>
#include <stout/stringify.hpp>
#include <stout/strings.hpp>
#include <stout/try.hpp>
#include <stout/uuid.hpp>
#include <stout/utils.hpp>
#ifdef __linux__
#include "linux/cgroups.hpp"
#include "linux/fs.hpp"
#endif // __linux__
#include "authentication/cram_md5/authenticatee.hpp"
#include "common/build.hpp"
#include "common/protobuf_utils.hpp"
#include "common/resources_utils.hpp"
#include "common/status_utils.hpp"
#include "credentials/credentials.hpp"
#include "hook/manager.hpp"
#include "logging/logging.hpp"
#include "module/manager.hpp"
#include "slave/constants.hpp"
#include "slave/flags.hpp"
#include "slave/paths.hpp"
#include "slave/slave.hpp"
#include "slave/status_update_manager.hpp"
using mesos::executor::Call;
using mesos::slave::QoSController;
using mesos::slave::QoSCorrection;
using mesos::slave::ResourceEstimator;
using std::list;
using std::map;
using std::set;
using std::string;
using std::vector;
using process::async;
using process::wait; // Necessary on some OS's to disambiguate.
using process::Clock;
using process::Failure;
using process::Future;
using process::Owned;
using process::Time;
using process::UPID;
namespace mesos {
namespace internal {
namespace slave {
using namespace state;
Slave::Slave(const slave::Flags& _flags,
MasterDetector* _detector,
Containerizer* _containerizer,
Files* _files,
GarbageCollector* _gc,
StatusUpdateManager* _statusUpdateManager,
ResourceEstimator* _resourceEstimator,
QoSController* _qosController)
: ProcessBase(process::ID::generate("slave")),
state(RECOVERING),
flags(_flags),
completedFrameworks(MAX_COMPLETED_FRAMEWORKS),
detector(_detector),
containerizer(_containerizer),
files(_files),
metrics(*this),
gc(_gc),
monitor(defer(self(), &Self::usage)),
statusUpdateManager(_statusUpdateManager),
masterPingTimeout(DEFAULT_MASTER_PING_TIMEOUT()),
metaDir(paths::getMetaRootDir(flags.work_dir)),
recoveryErrors(0),
credential(None()),
authenticatee(NULL),
authenticating(None()),
authenticated(false),
reauthenticate(false),
executorDirectoryMaxAllowedAge(age(0)),
resourceEstimator(_resourceEstimator),
qosController(_qosController) {}
Slave::~Slave()
{
// TODO(benh): Shut down frameworks?
// TODO(benh): Shut down executors? The executor should get an "exited"
// event and initiate a shut down itself.
foreachvalue (Framework* framework, frameworks) {
delete framework;
}
delete authenticatee;
}
lambda::function<void(int, int)>* signaledWrapper = NULL;
static void signalHandler(int sig, siginfo_t* siginfo, void* context)
{
if (signaledWrapper != NULL) {
(*signaledWrapper)(sig, siginfo->si_uid);
}
}
void Slave::signaled(int signal, int uid)
{
if (signal == SIGUSR1) {
Result<string> user = os::user(uid);
shutdown(
UPID(),
"Received SIGUSR1 signal" +
(user.isSome() ? " from user " + user.get() : ""));
}
}
void Slave::initialize()
{
LOG(INFO) << "Slave started on " << string(self()).substr(6);
LOG(INFO) << "Flags at startup: " << flags;
if (self().address.ip.isLoopback()) {
LOG(WARNING) << "\n**************************************************\n"
<< "Slave bound to loopback interface!"
<< " Cannot communicate with remote master(s)."
<< " You might want to set '--ip' flag to a routable"
<< " IP address.\n"
<< "**************************************************";
}
#ifdef __linux__
// Move the slave into its own cgroup for each of the specified
// subsystems.
// NOTE: Any subsystem configuration is inherited from the mesos
// root cgroup for that subsystem, e.g., by default the memory
// cgroup will be unlimited.
if (flags.slave_subsystems.isSome()) {
foreach (const string& subsystem,
strings::tokenize(flags.slave_subsystems.get(), ",")) {
LOG(INFO) << "Moving slave process into its own cgroup for"
<< " subsystem: " << subsystem;
// Ensure the subsystem is mounted and the Mesos root cgroup is
// present.
Try<string> hierarchy = cgroups::prepare(
flags.cgroups_hierarchy,
subsystem,
flags.cgroups_root);
if (hierarchy.isError()) {
EXIT(1) << "Failed to prepare cgroup " << flags.cgroups_root
<< " for subsystem " << subsystem
<< ": " << hierarchy.error();
}
// Create a cgroup for the slave.
string cgroup = path::join(flags.cgroups_root, "slave");
Try<bool> exists = cgroups::exists(hierarchy.get(), cgroup);
if (exists.isError()) {
EXIT(1) << "Failed to find cgroup " << cgroup
<< " for subsystem " << subsystem
<< " under hierarchy " << hierarchy.get()
<< " for slave: " << exists.error();
}
if (!exists.get()) {
Try<Nothing> create = cgroups::create(hierarchy.get(), cgroup);
if (create.isError()) {
EXIT(1) << "Failed to create cgroup " << cgroup
<< " for subsystem " << subsystem
<< " under hierarchy " << hierarchy.get()
<< " for slave: " << create.error();
}
}
// Exit if there are processes running inside the cgroup - this
// indicates a prior slave (or child process) is still running.
Try<set<pid_t>> processes = cgroups::processes(hierarchy.get(), cgroup);
if (processes.isError()) {
EXIT(1) << "Failed to check for existing threads in cgroup " << cgroup
<< " for subsystem " << subsystem
<< " under hierarchy " << hierarchy.get()
<< " for slave: " << processes.error();
}
// Log if there are any processes in the slave's cgroup. They
// may be transient helper processes like 'perf' or 'du',
// ancillary processes like 'docker log' or possibly a stuck
// slave.
// TODO(idownes): Generally, it's not a problem if there are
// processes running in the slave's cgroup, though any resources
// consumed by those processes are accounted to the slave. Where
// applicable, transient processes should be configured to
// terminate if the slave exits; see example usage for perf in
// isolators/cgroups/perf.cpp. Consider moving ancillary
// processes to a different cgroup, e.g., moving 'docker log' to
// the container's cgroup.
if (!processes.get().empty()) {
// For each process, we print its pid as well as its command
// to help triaging.
vector<string> infos;
foreach (pid_t pid, processes.get()) {
Result<os::Process> proc = os::process(pid);
// Only print the command if available.
if (proc.isSome()) {
infos.push_back(stringify(pid) + " '" + proc.get().command + "'");
} else {
infos.push_back(stringify(pid));
}
}
LOG(INFO) << "A slave (or child process) is still running, please"
<< " consider checking the following process(es) listed in "
<< path::join(hierarchy.get(), cgroup, "cgroups.proc")
<< ":\n" << strings::join("\n", infos);
}
// Move all of our threads into the cgroup.
Try<Nothing> assign = cgroups::assign(hierarchy.get(), cgroup, getpid());
if (assign.isError()) {
EXIT(1) << "Failed to move slave into cgroup " << cgroup
<< " for subsystem " << subsystem
<< " under hierarchy " << hierarchy.get()
<< " for slave: " << assign.error();
}
}
}
#endif // __linux__
if (flags.registration_backoff_factor > REGISTER_RETRY_INTERVAL_MAX) {
EXIT(1) << "Invalid value '" << flags.registration_backoff_factor << "' "
<< "for --registration_backoff_factor: "
<< "Must be less than " << REGISTER_RETRY_INTERVAL_MAX;
}
authenticateeName = flags.authenticatee;
if (flags.credential.isSome()) {
Result<Credential> _credential =
credentials::readCredential(flags.credential.get());
if (_credential.isError()) {
EXIT(1) << _credential.error() << " (see --credential flag)";
} else if (_credential.isNone()) {
EXIT(1) << "Empty credential file '" << flags.credential.get()
<< "' (see --credential flag)";
} else {
credential = _credential.get();
LOG(INFO) << "Slave using credential for: "
<< credential.get().principal();
}
}
if ((flags.gc_disk_headroom < 0) || (flags.gc_disk_headroom > 1)) {
EXIT(1) << "Invalid value '" << flags.gc_disk_headroom
<< "' for --gc_disk_headroom. Must be between 0.0 and 1.0.";
}
Try<Nothing> initialize =
resourceEstimator->initialize(defer(self(), &Self::usage));
if (initialize.isError()) {
EXIT(1) << "Failed to initialize the resource estimator: "
<< initialize.error();
}
initialize = qosController->initialize(defer(self(), &Self::usage));
if (initialize.isError()) {
EXIT(1) << "Failed to initialize the QoS Controller: "
<< initialize.error();
}
// Ensure slave work directory exists.
CHECK_SOME(os::mkdir(flags.work_dir))
<< "Failed to create slave work directory '" << flags.work_dir << "'";
Try<Resources> resources = Containerizer::resources(flags);
if (resources.isError()) {
EXIT(1) << "Failed to determine slave resources: " << resources.error();
}
// Ensure disk `source`s are accessible.
foreach (
const Resource& resource,
resources->filter([](const Resource& resource) {
return resource.has_disk() && resource.disk().has_source();
})) {
// For `PATH` sources we create them if they do not exist.
const Resource::DiskInfo::Source& source = resource.disk().source();
if (source.type() == Resource::DiskInfo::Source::PATH) {
CHECK(source.has_path());
Try<Nothing> mkdir =
os::mkdir(source.path().root(), true);
if (mkdir.isError()) {
EXIT(1) << "Failed to create DiskInfo path directory '"
<< source.path().root() << "': " << mkdir.error();
}
} else if (source.type() == Resource::DiskInfo::Source::MOUNT) {
CHECK(source.has_mount());
// For `MOUNT` sources we fail if they don't exist.
// On Linux we test the mount table for existence.
#ifdef __linux__
// Get the `realpath` of the `root` to verify it against the
// mount table entries.
// TODO(jmlvanre): Consider enforcing allowing only real paths
// as opposed to symlinks. This would prevent the ability for
// an operator to change the underlying data while the slave
// checkpointed `root` had the same value. We could also check
// the UUID of the underlying block device to catch this case.
Result<string> realpath = os::realpath(source.mount().root());
if (!realpath.isSome()) {
EXIT(1)
<< "Failed to determine `realpath` for DiskInfo mount in resource '"
<< resource << "' with path '" << source.mount().root() << "': "
<< (realpath.isError() ? realpath.error() : "no such path");
}
// TODO(jmlvanre): Consider moving this out of the for loop.
Try<fs::MountTable> mountTable = fs::MountTable::read("/proc/mounts");
if (mountTable.isError()) {
EXIT(1) << "Failed to open mount table to verify mounts: "
<< mountTable.error();
}
bool foundEntry = false;
foreach (const fs::MountTable::Entry& entry, mountTable.get().entries) {
if (entry.dir == realpath.get()) {
foundEntry = true;
break;
}
}
if (!foundEntry) {
EXIT(1) << "Failed to found mount '" << realpath.get()
<< "' in /proc/mounts";
}
#else // __linux__
// On other platforms we test whether that provided `root` exists.
if (!os::exists(source.mount().root())) {
EXIT(1) << "Failed to find mount point '" << source.mount().root();
}
#endif // __linux__
} else {
EXIT(1) << "Unsupported 'DiskInfo.Source.Type' in '" << resource << "'";
}
}
Attributes attributes;
if (flags.attributes.isSome()) {
attributes = Attributes::parse(flags.attributes.get());
}
// Determine our hostname or use the hostname provided.
string hostname;
if (flags.hostname.isNone()) {
if (flags.hostname_lookup) {
Try<string> result = net::getHostname(self().address.ip);
if (result.isError()) {
LOG(FATAL) << "Failed to get hostname: " << result.error();
}
hostname = result.get();
} else {
// We use the IP address for hostname if the user requested us
// NOT to look it up, and it wasn't explicitly set via --hostname:
hostname = stringify(self().address.ip);
}
} else {
hostname = flags.hostname.get();
}
// Initialize slave info.
info.set_hostname(hostname);
info.set_port(self().address.port);
info.mutable_resources()->CopyFrom(resources.get());
if (HookManager::hooksAvailable()) {
info.mutable_resources()->CopyFrom(
HookManager::slaveResourcesDecorator(info));
}
LOG(INFO) << "Slave resources: " << info.resources();
info.mutable_attributes()->CopyFrom(attributes);
if (HookManager::hooksAvailable()) {
info.mutable_attributes()->CopyFrom(
HookManager::slaveAttributesDecorator(info));
}
LOG(INFO) << "Slave attributes: " << info.attributes();
// Checkpointing of slaves is always enabled.
info.set_checkpoint(true);
LOG(INFO) << "Slave hostname: " << info.hostname();
statusUpdateManager->initialize(defer(self(), &Slave::forward, lambda::_1));
// Start disk monitoring.
// NOTE: We send a delayed message here instead of directly calling
// checkDiskUsage, to make disabling this feature easy (e.g by specifying
// a very large disk_watch_interval).
delay(flags.disk_watch_interval, self(), &Slave::checkDiskUsage);
startTime = Clock::now();
// Install protobuf handlers.
install<SlaveRegisteredMessage>(
&Slave::registered,
&SlaveRegisteredMessage::slave_id,
&SlaveRegisteredMessage::connection);
install<SlaveReregisteredMessage>(
&Slave::reregistered,
&SlaveReregisteredMessage::slave_id,
&SlaveReregisteredMessage::reconciliations,
&SlaveReregisteredMessage::connection);
install<RunTaskMessage>(
&Slave::runTask,
&RunTaskMessage::framework,
&RunTaskMessage::framework_id,
&RunTaskMessage::pid,
&RunTaskMessage::task);
install<KillTaskMessage>(
&Slave::killTask,
&KillTaskMessage::framework_id,
&KillTaskMessage::task_id);
install<ShutdownExecutorMessage>(
&Slave::shutdownExecutor,
&ShutdownExecutorMessage::framework_id,
&ShutdownExecutorMessage::executor_id);
install<ShutdownFrameworkMessage>(
&Slave::shutdownFramework,
&ShutdownFrameworkMessage::framework_id);
install<FrameworkToExecutorMessage>(
&Slave::schedulerMessage,
&FrameworkToExecutorMessage::slave_id,
&FrameworkToExecutorMessage::framework_id,
&FrameworkToExecutorMessage::executor_id,
&FrameworkToExecutorMessage::data);
install<UpdateFrameworkMessage>(
&Slave::updateFramework,
&UpdateFrameworkMessage::framework_id,
&UpdateFrameworkMessage::pid);
install<CheckpointResourcesMessage>(
&Slave::checkpointResources,
&CheckpointResourcesMessage::resources);
install<StatusUpdateAcknowledgementMessage>(
&Slave::statusUpdateAcknowledgement,
&StatusUpdateAcknowledgementMessage::slave_id,
&StatusUpdateAcknowledgementMessage::framework_id,
&StatusUpdateAcknowledgementMessage::task_id,
&StatusUpdateAcknowledgementMessage::uuid);
install<RegisterExecutorMessage>(
&Slave::registerExecutor,
&RegisterExecutorMessage::framework_id,
&RegisterExecutorMessage::executor_id);
install<ReregisterExecutorMessage>(
&Slave::reregisterExecutor,
&ReregisterExecutorMessage::framework_id,
&ReregisterExecutorMessage::executor_id,
&ReregisterExecutorMessage::tasks,
&ReregisterExecutorMessage::updates);
install<StatusUpdateMessage>(
&Slave::statusUpdate,
&StatusUpdateMessage::update,
&StatusUpdateMessage::pid);
install<ExecutorToFrameworkMessage>(
&Slave::executorMessage,
&ExecutorToFrameworkMessage::slave_id,
&ExecutorToFrameworkMessage::framework_id,
&ExecutorToFrameworkMessage::executor_id,
&ExecutorToFrameworkMessage::data);
install<ShutdownMessage>(
&Slave::shutdown,
&ShutdownMessage::message);
install<PingSlaveMessage>(
&Slave::ping,
&PingSlaveMessage::connected);
// Setup HTTP routes.
Http http = Http(this);
route("/api/v1/executor",
Http::EXECUTOR_HELP(),
[http](const process::http::Request& request) {
Http::log(request);
return http.executor(request);
});
// TODO(ijimenez): Remove this endpoint at the end of the
// deprecation cycle on 0.26.
route("/state.json",
Http::STATE_HELP(),
[http](const process::http::Request& request) {
Http::log(request);
return http.state(request);
});
route("/state",
Http::STATE_HELP(),
[http](const process::http::Request& request) {
Http::log(request);
return http.state(request);
});
route("/flags",
Http::FLAGS_HELP(),
[http](const process::http::Request& request) {
Http::log(request);
return http.flags(request);
});
route("/health",
Http::HEALTH_HELP(),
[http](const process::http::Request& request) {
return http.health(request);
});
// Expose the log file for the webui. Fall back to 'log_dir' if
// an explicit file was not specified.
if (flags.external_log_file.isSome()) {
files->attach(flags.external_log_file.get(), "/slave/log")
.onAny(defer(self(),
&Self::fileAttached,
lambda::_1,
flags.external_log_file.get()));
} else if (flags.log_dir.isSome()) {
Try<string> log =
logging::getLogFile(logging::getLogSeverity(flags.logging_level));
if (log.isError()) {
LOG(ERROR) << "Slave log file cannot be found: " << log.error();
} else {
files->attach(log.get(), "/slave/log")
.onAny(defer(self(), &Self::fileAttached, lambda::_1, log.get()));
}
}
// Check that the recover flag is valid.
if (flags.recover != "reconnect" && flags.recover != "cleanup") {
EXIT(1) << "Unknown option for 'recover' flag " << flags.recover
<< ". Please run the slave with '--help' to see the valid options";
}
struct sigaction action;
memset(&action, 0, sizeof(struct sigaction));
// Do not block additional signals while in the handler.
sigemptyset(&action.sa_mask);
// The SA_SIGINFO flag tells sigaction() to use
// the sa_sigaction field, not sa_handler.
action.sa_flags = SA_SIGINFO;
signaledWrapper = new lambda::function<void(int, int)>(
defer(self(), &Slave::signaled, lambda::_1, lambda::_2));
action.sa_sigaction = signalHandler;
if (sigaction(SIGUSR1, &action, NULL) < 0) {
EXIT(1) << "Failed to set sigaction: " << os::strerror(errno);
}
// Do recovery.
async(&state::recover, metaDir, flags.strict)
.then(defer(self(), &Slave::recover, lambda::_1))
.then(defer(self(), &Slave::_recover))
.onAny(defer(self(), &Slave::__recover, lambda::_1));
}
void Slave::finalize()
{
LOG(INFO) << "Slave terminating";
// NOTE: We use 'frameworks.keys()' here because 'shutdownFramework'
// can potentially remove a framework from 'frameworks'.
foreach (const FrameworkID& frameworkId, frameworks.keys()) {
// TODO(benh): Because a shut down isn't instantaneous (but has
// a shut down/kill phases) we might not actually propagate all
// the status updates appropriately here. Consider providing
// an alternative function which skips the shut down phase and
// simply does a kill (sending all status updates
// immediately). Of course, this still isn't sufficient
// because those status updates might get lost and we won't
// resend them unless we build that into the system.
// NOTE: We shut down the framework only if it has disabled
// checkpointing. This is because slave recovery tests terminate
// the slave to simulate slave restart.
if (!frameworks[frameworkId]->info.checkpoint()) {
shutdownFramework(UPID(), frameworkId);
}
}
if (state == TERMINATING) {
// We remove the "latest" symlink in meta directory, so that the
// slave doesn't recover the state when it restarts and registers
// as a new slave with the master.
if (os::exists(paths::getLatestSlavePath(metaDir))) {
CHECK_SOME(os::rm(paths::getLatestSlavePath(metaDir)));
}
}
}
void Slave::shutdown(const UPID& from, const string& message)
{
if (from && master != from) {
LOG(WARNING) << "Ignoring shutdown message from " << from
<< " because it is not from the registered master: "
<< (master.isSome() ? stringify(master.get()) : "None");
return;
}
if (from) {
LOG(INFO) << "Slave asked to shut down by " << from
<< (message.empty() ? "" : " because '" + message + "'");
} else if (info.has_id()) {
if (message.empty()) {
LOG(INFO) << "Unregistering and shutting down";
} else {
LOG(INFO) << message << "; unregistering and shutting down";
}
UnregisterSlaveMessage message_;
message_.mutable_slave_id()->MergeFrom(info.id());
send(master.get(), message_);
} else {
if (message.empty()) {
LOG(INFO) << "Shutting down";
} else {
LOG(INFO) << message << "; shutting down";
}
}
state = TERMINATING;
if (frameworks.empty()) { // Terminate slave if there are no frameworks.
terminate(self());
} else {
// NOTE: The slave will terminate after all the executors have
// terminated.
// NOTE: We use 'frameworks.keys()' here because 'shutdownFramework'
// can potentially remove a framework from 'frameworks'.
foreach (const FrameworkID& frameworkId, frameworks.keys()) {
shutdownFramework(from, frameworkId);
}
}
}
void Slave::fileAttached(const Future<Nothing>& result, const string& path)
{
if (result.isReady()) {
VLOG(1) << "Successfully attached file '" << path << "'";
} else {
LOG(ERROR) << "Failed to attach file '" << path << "': "
<< (result.isFailed() ? result.failure() : "discarded");
}
}
// TODO(vinod/bmahler): Get rid of this helper.
Nothing Slave::detachFile(const string& path)
{
files->detach(path);
return Nothing();
}
void Slave::detected(const Future<Option<MasterInfo>>& _master)
{
CHECK(state == DISCONNECTED ||
state == RUNNING ||
state == TERMINATING) << state;
if (state != TERMINATING) {
state = DISCONNECTED;
}
// Pause the status updates.
statusUpdateManager->pause();
if (_master.isFailed()) {
EXIT(1) << "Failed to detect a master: " << _master.failure();
}
Option<MasterInfo> latest;
if (_master.isDiscarded()) {
LOG(INFO) << "Re-detecting master";
latest = None();
master = None();
} else if (_master.get().isNone()) {
LOG(INFO) << "Lost leading master";
latest = None();
master = None();
} else {
latest = _master.get();
master = UPID(_master.get().get().pid());
LOG(INFO) << "New master detected at " << master.get();
link(master.get());
if (state == TERMINATING) {
LOG(INFO) << "Skipping registration because slave is terminating";
return;
}
// Wait for a random amount of time before authentication or
// registration.
Duration duration =
flags.registration_backoff_factor * ((double) ::random() / RAND_MAX);
if (credential.isSome()) {
// Authenticate with the master.
// TODO(vinod): Do a backoff for authentication similar to what
// we do for registration. This is a little tricky because, if
// we delay 'Slave::authenticate' and a new master is detected
// before 'authenticate' event is processed the slave tries to
// authenticate with the new master twice.
// TODO(vinod): Consider adding an "AUTHENTICATED" state to the
// slave instead of "authenticate" variable.
authenticate();
} else {
// Proceed with registration without authentication.
LOG(INFO) << "No credentials provided."
<< " Attempting to register without authentication";
delay(duration,
self(),
&Slave::doReliableRegistration,
flags.registration_backoff_factor * 2); // Backoff.
}
}
// Keep detecting masters.
LOG(INFO) << "Detecting new master";
detection = detector->detect(latest)
.onAny(defer(self(), &Slave::detected, lambda::_1));
}
void Slave::authenticate()
{
authenticated = false;
if (master.isNone()) {
return;
}
if (authenticating.isSome()) {
// Authentication is in progress. Try to cancel it.
// Note that it is possible that 'authenticating' is ready
// and the dispatch to '_authenticate' is enqueued when we
// are here, making the 'discard' here a no-op. This is ok
// because we set 'reauthenticate' here which enforces a retry
// in '_authenticate'.
Future<bool> authenticating_ = authenticating.get();
authenticating_.discard();
reauthenticate = true;
return;
}
LOG(INFO) << "Authenticating with master " << master.get();
CHECK(authenticatee == NULL);
if (authenticateeName == DEFAULT_AUTHENTICATEE) {
LOG(INFO) << "Using default CRAM-MD5 authenticatee";
authenticatee = new cram_md5::CRAMMD5Authenticatee();
} else {
Try<Authenticatee*> module =
modules::ModuleManager::create<Authenticatee>(authenticateeName);
if (module.isError()) {
EXIT(1) << "Could not create authenticatee module '"
<< authenticateeName << "': " << module.error();
}
LOG(INFO) << "Using '" << authenticateeName << "' authenticatee";
authenticatee = module.get();
}
CHECK_SOME(credential);
authenticating =
authenticatee->authenticate(master.get(), self(), credential.get())
.onAny(defer(self(), &Self::_authenticate));
delay(Seconds(5), self(), &Self::authenticationTimeout, authenticating.get());
}
void Slave::_authenticate()
{
delete CHECK_NOTNULL(authenticatee);
authenticatee = NULL;
CHECK_SOME(authenticating);
const Future<bool>& future = authenticating.get();
if (master.isNone()) {
LOG(INFO) << "Ignoring _authenticate because the master is lost";
authenticating = None();
// Set it to false because we do not want further retries until
// a new master is detected.
// We obviously do not need to reauthenticate either even if
// 'reauthenticate' is currently true because the master is
// lost.
reauthenticate = false;
return;
}
if (reauthenticate || !future.isReady()) {
LOG(WARNING)
<< "Failed to authenticate with master " << master.get() << ": "
<< (reauthenticate ? "master changed" :
(future.isFailed() ? future.failure() : "future discarded"));
authenticating = None();
reauthenticate = false;
// TODO(vinod): Add a limit on number of retries.
dispatch(self(), &Self::authenticate); // Retry.
return;
}
if (!future.get()) {
// For refused authentication, we exit instead of doing a shutdown
// to keep possibly active executors running.
EXIT(1) << "Master " << master.get() << " refused authentication";
}
LOG(INFO) << "Successfully authenticated with master " << master.get();
authenticated = true;
authenticating = None();
// Proceed with registration.
doReliableRegistration(flags.registration_backoff_factor * 2);
}
void Slave::authenticationTimeout(Future<bool> future)
{
// NOTE: Discarded future results in a retry in '_authenticate()'.
// Also note that a 'discard' here is safe even if another
// authenticator is in progress because this copy of the future
// corresponds to the original authenticator that started the timer.
if (future.discard()) { // This is a no-op if the future is already ready.
LOG(WARNING) << "Authentication timed out";
}
}
void Slave::registered(
const UPID& from,
const SlaveID& slaveId,
const MasterSlaveConnection& connection)
{
if (master != from) {
LOG(WARNING) << "Ignoring registration message from " << from
<< " because it is not the expected master: "
<< (master.isSome() ? stringify(master.get()) : "None");
return;
}
CHECK_SOME(master);
if (connection.has_total_ping_timeout_seconds()) {
masterPingTimeout = Seconds(connection.total_ping_timeout_seconds());
} else {
masterPingTimeout = DEFAULT_MASTER_PING_TIMEOUT();
}
switch (state) {
case DISCONNECTED: {
LOG(INFO) << "Registered with master " << master.get()
<< "; given slave ID " << slaveId;
// TODO(bernd-mesos): Make this an instance method call, see comment
// in "fetcher.hpp"".
Try<Nothing> recovered = Fetcher::recover(slaveId, flags);
if (recovered.isError()) {
LOG(FATAL) << "Could not initialize fetcher cache: "
<< recovered.error();
}
state = RUNNING;
statusUpdateManager->resume(); // Resume status updates.
info.mutable_id()->CopyFrom(slaveId); // Store the slave id.
// Create the slave meta directory.
paths::createSlaveDirectory(metaDir, slaveId);
// Checkpoint slave info.
const string path = paths::getSlaveInfoPath(metaDir, slaveId);
VLOG(1) << "Checkpointing SlaveInfo to '" << path << "'";
CHECK_SOME(state::checkpoint(path, info));
// If we don't get a ping from the master, trigger a
// re-registration. This needs to be done once registered,
// in case we never receive an initial ping.
Clock::cancel(pingTimer);
pingTimer = delay(
masterPingTimeout,
self(),
&Slave::pingTimeout,
detection);
break;
}
case RUNNING:
// Already registered!
if (!(info.id() == slaveId)) {
EXIT(1) << "Registered but got wrong id: " << slaveId
<< "(expected: " << info.id() << "). Committing suicide";
}
LOG(WARNING) << "Already registered with master " << master.get();
break;
case TERMINATING:
LOG(WARNING) << "Ignoring registration because slave is terminating";
break;
case RECOVERING:
default:
LOG(FATAL) << "Unexpected slave state " << state;
break;
}
// Send the latest estimate for oversubscribed resources.
if (oversubscribedResources.isSome()) {
LOG(INFO) << "Forwarding total oversubscribed resources "
<< oversubscribedResources.get();
UpdateSlaveMessage message;
message.mutable_slave_id()->CopyFrom(info.id());
message.mutable_oversubscribed_resources()->CopyFrom(
oversubscribedResources.get());
send(master.get(), message);
}
}
void Slave::reregistered(
const UPID& from,
const SlaveID& slaveId,
const vector<ReconcileTasksMessage>& reconciliations,
const MasterSlaveConnection& connection)
{
if (master != from) {
LOG(WARNING) << "Ignoring re-registration message from " << from
<< " because it is not the expected master: "
<< (master.isSome() ? stringify(master.get()) : "None");
return;
}
CHECK_SOME(master);
if (!(info.id() == slaveId)) {
EXIT(1) << "Re-registered but got wrong id: " << slaveId
<< "(expected: " << info.id() << "). Committing suicide";
}
if (connection.has_total_ping_timeout_seconds()) {
masterPingTimeout = Seconds(connection.total_ping_timeout_seconds());
} else {
masterPingTimeout = DEFAULT_MASTER_PING_TIMEOUT();
}
switch (state) {
case DISCONNECTED:
LOG(INFO) << "Re-registered with master " << master.get();
state = RUNNING;
statusUpdateManager->resume(); // Resume status updates.
// If we don't get a ping from the master, trigger a
// re-registration. This needs to be done once re-registered,
// in case we never receive an initial ping.
Clock::cancel(pingTimer);
pingTimer = delay(
masterPingTimeout,
self(),
&Slave::pingTimeout,
detection);
break;
case RUNNING:
LOG(WARNING) << "Already re-registered with master " << master.get();
break;
case TERMINATING:
LOG(WARNING) << "Ignoring re-registration because slave is terminating";
return;
case RECOVERING:
// It's possible to receive a message intended for the previous
// run of the slave here. Short term we can leave this as is and
// crash in this case. Ideally responses can be tied to a
// particular run of the slave, see:
// https://issues.apache.org/jira/browse/MESOS-676
// https://issues.apache.org/jira/browse/MESOS-677
default:
LOG(FATAL) << "Unexpected slave state " << state;
return;
}
// Send the latest estimate for oversubscribed resources.
if (oversubscribedResources.isSome()) {
LOG(INFO) << "Forwarding total oversubscribed resources "
<< oversubscribedResources.get();
UpdateSlaveMessage message;
message.mutable_slave_id()->CopyFrom(info.id());
message.mutable_oversubscribed_resources()->CopyFrom(
oversubscribedResources.get());
send(master.get(), message);
}
// Reconcile any tasks per the master's request.
foreach (const ReconcileTasksMessage& reconcile, reconciliations) {
Framework* framework = getFramework(reconcile.framework_id());
foreach (const TaskStatus& status, reconcile.statuses()) {
const TaskID& taskId = status.task_id();
bool known = false;
// Try to locate the task.
if (framework != NULL) {
foreachkey (const ExecutorID& executorId, framework->pending) {
if (framework->pending[executorId].contains(taskId)) {
known = true;
}
}
foreachvalue (Executor* executor, framework->executors) {
if (executor->queuedTasks.contains(taskId) ||
executor->launchedTasks.contains(taskId) ||
executor->terminatedTasks.contains(taskId)) {
known = true;
}
}
}
// We only need to send a TASK_LOST update when the task is
// unknown (so that the master removes it). Otherwise, the
// master correctly holds the task and will receive updates.
if (!known) {
LOG(WARNING) << "Slave reconciling task " << taskId
<< " of framework " << reconcile.framework_id()
<< " in state TASK_LOST: task unknown to the slave";
const StatusUpdate update = protobuf::createStatusUpdate(
reconcile.framework_id(),
info.id(),
taskId,
TASK_LOST,
TaskStatus::SOURCE_SLAVE,
UUID::random(),
"Reconciliation: task unknown to the slave",
TaskStatus::REASON_RECONCILIATION);
// NOTE: We can't use statusUpdate() here because it drops
// updates for unknown frameworks.
statusUpdateManager->update(update, info.id())
.onAny(defer(self(),
&Slave::__statusUpdate,
lambda::_1,
update,
UPID()));
}
}
}
}
void Slave::doReliableRegistration(Duration maxBackoff)
{
if (master.isNone()) {
LOG(INFO) << "Skipping registration because no master present";
return;
}
if (credential.isSome() && !authenticated) {
LOG(INFO) << "Skipping registration because not authenticated";
return;
}
if (state == RUNNING) { // Slave (re-)registered with the master.
return;
}
if (state == TERMINATING) {
LOG(INFO) << "Skipping registration because slave is terminating";
return;
}
CHECK(state == DISCONNECTED) << state;
CHECK_NE("cleanup", flags.recover);
if (!info.has_id()) {
// Registering for the first time.
RegisterSlaveMessage message;
message.set_version(MESOS_VERSION);
message.mutable_slave()->CopyFrom(info);
// Include checkpointed resources.
message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources);
send(master.get(), message);
} else {
// Re-registering, so send tasks running.
ReregisterSlaveMessage message;
message.set_version(MESOS_VERSION);
// Include checkpointed resources.
message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources);
message.mutable_slave()->CopyFrom(info);
foreachvalue (Framework* framework, frameworks) {
// TODO(bmahler): We need to send the executors for these
// pending tasks, and we need to send exited events if they
// cannot be launched: MESOS-1715 MESOS-1720.
typedef hashmap<TaskID, TaskInfo> TaskMap;
foreachvalue (const TaskMap& tasks, framework->pending) {
foreachvalue (const TaskInfo& task, tasks) {
message.add_tasks()->CopyFrom(protobuf::createTask(
task, TASK_STAGING, framework->id()));
}
}
foreachvalue (Executor* executor, framework->executors) {
// Add launched, terminated, and queued tasks.
// Note that terminated executors will only have terminated
// unacknowledged tasks.
// Note that for each task the latest state and status update
// state (if any) is also included.
foreach (Task* task, executor->launchedTasks.values()) {
message.add_tasks()->CopyFrom(*task);
}
foreach (Task* task, executor->terminatedTasks.values()) {
message.add_tasks()->CopyFrom(*task);
}
foreach (const TaskInfo& task, executor->queuedTasks.values()) {
message.add_tasks()->CopyFrom(protobuf::createTask(
task, TASK_STAGING, framework->id()));
}
// Do not re-register with Command Executors because the
// master doesn't store them; they are generated by the slave.
if (executor->isCommandExecutor()) {
// NOTE: We have to unset the executor id here for the task
// because the master uses the absence of task.executor_id()
// to detect command executors.
for (int i = 0; i < message.tasks_size(); ++i) {
message.mutable_tasks(i)->clear_executor_id();
}
} else {
// Ignore terminated executors because they do not consume
// any resources.
if (executor->state != Executor::TERMINATED) {
ExecutorInfo* executorInfo = message.add_executor_infos();
executorInfo->MergeFrom(executor->info);
// Scheduler Driver will ensure the framework id is set in
// ExecutorInfo, effectively making it a required field.
CHECK(executorInfo->has_framework_id());
}
}
}
}
// Add completed frameworks.
foreach (const Owned<Framework>& completedFramework, completedFrameworks) {
VLOG(1) << "Reregistering completed framework "
<< completedFramework->id();
Archive::Framework* completedFramework_ =
message.add_completed_frameworks();
completedFramework_->mutable_framework_info()->CopyFrom(
completedFramework->info);
if (completedFramework->pid.isSome()) {
completedFramework_->set_pid(completedFramework->pid.get());
}
foreach (const Owned<Executor>& executor,
completedFramework->completedExecutors) {
VLOG(2) << "Reregistering completed executor '" << executor->id
<< "' with " << executor->terminatedTasks.size()
<< " terminated tasks, " << executor->completedTasks.size()
<< " completed tasks";
foreach (const Task* task, executor->terminatedTasks.values()) {
VLOG(2) << "Reregistering terminated task " << task->task_id();
completedFramework_->add_tasks()->CopyFrom(*task);
}
foreach (const std::shared_ptr<Task>& task, executor->completedTasks) {
VLOG(2) << "Reregistering completed task " << task->task_id();
completedFramework_->add_tasks()->CopyFrom(*task);
}
}
}
CHECK_SOME(master);
send(master.get(), message);
}
// Bound the maximum backoff by 'REGISTER_RETRY_INTERVAL_MAX'.
maxBackoff = std::min(maxBackoff, REGISTER_RETRY_INTERVAL_MAX);
// Determine the delay for next attempt by picking a random
// duration between 0 and 'maxBackoff'.
Duration delay = maxBackoff * ((double) ::random() / RAND_MAX);
VLOG(1) << "Will retry registration in " << delay << " if necessary";
// Backoff.
process::delay(delay, self(), &Slave::doReliableRegistration, maxBackoff * 2);
}
// Helper to unschedule the path.
// TODO(vinod): Can we avoid this helper?
Future<bool> Slave::unschedule(const string& path)
{
return gc->unschedule(path);
}
// TODO(vinod): Instead of crashing the slave on checkpoint errors,
// send TASK_LOST to the framework.
void Slave::runTask(
const UPID& from,
const FrameworkInfo& frameworkInfo,
const FrameworkID& frameworkId_,
const UPID& pid,
TaskInfo task)
{
if (master != from) {
LOG(WARNING) << "Ignoring run task message from " << from
<< " because it is not the expected master: "
<< (master.isSome() ? stringify(master.get()) : "None");
return;
}
if (!frameworkInfo.has_id()) {
LOG(ERROR) << "Ignoring run task message from " << from
<< " because it does not have a framework ID";
return;
}
// Create frameworkId alias to use in the rest of the function.
const FrameworkID frameworkId = frameworkInfo.id();
LOG(INFO) << "Got assigned task " << task.task_id()
<< " for framework " << frameworkId;
if (!(task.slave_id() == info.id())) {
LOG(WARNING)
<< "Slave " << info.id() << " ignoring task " << task.task_id()
<< " because it was intended for old slave " << task.slave_id();
return;
}
CHECK(state == RECOVERING || state == DISCONNECTED ||
state == RUNNING || state == TERMINATING)
<< state;
// TODO(bmahler): Also ignore if we're DISCONNECTED.
if (state == RECOVERING || state == TERMINATING) {
LOG(WARNING) << "Ignoring task " << task.task_id()
<< " because the slave is " << state;
// TODO(vinod): Consider sending a TASK_LOST here.
// Currently it is tricky because 'statusUpdate()'
// ignores updates for unknown frameworks.
return;
}
Future<bool> unschedule = true;
// If we are about to create a new framework, unschedule the work
// and meta directories from getting gc'ed.
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
// Unschedule framework work directory.
string path = paths::getFrameworkPath(
flags.work_dir, info.id(), frameworkId);
if (os::exists(path)) {
unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
}
// Unschedule framework meta directory.
path = paths::getFrameworkPath(metaDir, info.id(), frameworkId);
if (os::exists(path)) {
unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
}
Option<UPID> frameworkPid = None();
if (pid != UPID()) {
frameworkPid = pid;
}
framework = new Framework(this, frameworkInfo, frameworkPid);
frameworks[frameworkId] = framework;
if (frameworkInfo.checkpoint()) {
framework->checkpointFramework();
}
// Is this same framework in completedFrameworks? If so, move the completed
// executors to this framework and remove it from that list.
// TODO(brenden): Consider using stout/cache.hpp instead of boost
// circular_buffer.
for (auto it = completedFrameworks.begin(), end = completedFrameworks.end();
it != end;
++it) {
if ((*it)->id() == frameworkId) {
framework->completedExecutors = (*it)->completedExecutors;
completedFrameworks.erase(it);
break;
}
}
}
const ExecutorInfo executorInfo = getExecutorInfo(frameworkInfo, task);
const ExecutorID& executorId = executorInfo.executor_id();
if (HookManager::hooksAvailable()) {
// Set task labels from run task label decorator.
task.mutable_labels()->CopyFrom(HookManager::slaveRunTaskLabelDecorator(
task, executorInfo, frameworkInfo, info));
}
// We add the task to 'pending' to ensure the framework is not
// removed and the framework and top level executor directories
// are not scheduled for deletion before '_runTask()' is called.
CHECK_NOTNULL(framework);
framework->pending[executorId][task.task_id()] = task;
// If we are about to create a new executor, unschedule the top
// level work and meta directories from getting gc'ed.
Executor* executor = framework->getExecutor(executorId);
if (executor == NULL) {
// Unschedule executor work directory.
string path = paths::getExecutorPath(
flags.work_dir, info.id(), frameworkId, executorId);
if (os::exists(path)) {
unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
}
// Unschedule executor meta directory.
path = paths::getExecutorPath(metaDir, info.id(), frameworkId, executorId);
if (os::exists(path)) {
unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
}
}
// Run the task after the unschedules are done.
unschedule.onAny(
defer(self(), &Self::_runTask, lambda::_1, frameworkInfo, task));
}
void Slave::_runTask(
const Future<bool>& future,
const FrameworkInfo& frameworkInfo,
const TaskInfo& task)
{
const FrameworkID frameworkId = frameworkInfo.id();
LOG(INFO) << "Launching task " << task.task_id()
<< " for framework " << frameworkId;
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING) << "Ignoring run task " << task.task_id()
<< " because the framework " << frameworkId
<< " does not exist";
return;
}
const ExecutorInfo executorInfo = getExecutorInfo(frameworkInfo, task);
const ExecutorID& executorId = executorInfo.executor_id();
if (framework->pending.contains(executorId) &&
framework->pending[executorId].contains(task.task_id())) {
framework->pending[executorId].erase(task.task_id());
if (framework->pending[executorId].empty()) {
framework->pending.erase(executorId);
// NOTE: Ideally we would perform the following check here:
//
// if (framework->executors.empty() &&
// framework->pending.empty()) {
// removeFramework(framework);
// }
//
// However, we need 'framework' to stay valid for the rest of
// this function. As such, we perform the check before each of
// the 'return' statements below.
}
} else {
LOG(WARNING) << "Ignoring run task " << task.task_id()
<< " of framework " << frameworkId
<< " because the task has been killed in the meantime";
return;
}
// We don't send a status update here because a terminating
// framework cannot send acknowledgements.
if (framework->state == Framework::TERMINATING) {
LOG(WARNING) << "Ignoring run task " << task.task_id()
<< " of framework " << frameworkId
<< " because the framework is terminating";
// Refer to the comment after 'framework->pending.erase' above
// for why we need this.
if (framework->executors.empty() && framework->pending.empty()) {
removeFramework(framework);
}
return;
}
if (!future.isReady()) {
LOG(ERROR) << "Failed to unschedule directories scheduled for gc: "
<< (future.isFailed() ? future.failure() : "future discarded");
const StatusUpdate update = protobuf::createStatusUpdate(
frameworkId,
info.id(),
task.task_id(),
TASK_LOST,
TaskStatus::SOURCE_SLAVE,
UUID::random(),
"Could not launch the task because we failed to unschedule directories"
" scheduled for gc",
TaskStatus::REASON_GC_ERROR);
// TODO(vinod): Ensure that the status update manager reliably
// delivers this update. Currently, we don't guarantee this
// because removal of the framework causes the status update
// manager to stop retrying for its un-acked updates.
statusUpdate(update, UPID());
// Refer to the comment after 'framework->pending.erase' above
// for why we need this.
if (framework->executors.empty() && framework->pending.empty()) {
removeFramework(framework);
}
return;
}
// NOTE: If the task or executor uses resources that are
// checkpointed on the slave (e.g. persistent volumes), we should
// already know about it. If the slave doesn't know about them (e.g.
// CheckpointResourcesMessage was dropped or came out of order),
// we send TASK_LOST status updates here since restarting the task
// may succeed in the event that CheckpointResourcesMessage arrives
// out of order.
Resources checkpointedTaskResources =
Resources(task.resources()).filter(needCheckpointing);
foreach (const Resource& resource, checkpointedTaskResources) {
if (!checkpointedResources.contains(resource)) {
LOG(WARNING) << "Unknown checkpointed resource " << resource
<< " for task " << task.task_id()
<< " of framework " << frameworkId;
const StatusUpdate update = protobuf::createStatusUpdate(
frameworkId,
info.id(),
task.task_id(),
TASK_LOST,
TaskStatus::SOURCE_SLAVE,
UUID::random(),
"The checkpointed resources being used by the task are unknown to "
"the slave",
TaskStatus::REASON_RESOURCES_UNKNOWN);
statusUpdate(update, UPID());
// Refer to the comment after 'framework->pending.erase' above
// for why we need this.
if (framework->executors.empty() && framework->pending.empty()) {
removeFramework(framework);
}
return;
}
}
if (task.has_executor()) {
Resources checkpointedExecutorResources =
Resources(task.executor().resources()).filter(needCheckpointing);
foreach (const Resource& resource, checkpointedExecutorResources) {
if (!checkpointedResources.contains(resource)) {
LOG(WARNING) << "Unknown checkpointed resource " << resource
<< " for executor '" << task.executor().executor_id()
<< "' of framework " << frameworkId;
const StatusUpdate update = protobuf::createStatusUpdate(
frameworkId,
info.id(),
task.task_id(),
TASK_LOST,
TaskStatus::SOURCE_SLAVE,
UUID::random(),
"The checkpointed resources being used by the executor are unknown "
"to the slave",
TaskStatus::REASON_RESOURCES_UNKNOWN,
task.executor().executor_id());
statusUpdate(update, UPID());
// Refer to the comment after 'framework->pending.erase' above
// for why we need this.
if (framework->executors.empty() && framework->pending.empty()) {
removeFramework(framework);
}
return;
}
}
}
// NOTE: The slave cannot be in 'RECOVERING' because the task would
// have been rejected in 'runTask()' in that case.
CHECK(state == DISCONNECTED || state == RUNNING || state == TERMINATING)
<< state;
if (state == TERMINATING) {
LOG(WARNING) << "Ignoring run task " << task.task_id()
<< " of framework " << frameworkId
<< " because the slave is terminating";
// Refer to the comment after 'framework->pending.erase' above
// for why we need this.
if (framework->executors.empty() && framework->pending.empty()) {
removeFramework(framework);
}
// We don't send a TASK_LOST here because the slave is
// terminating.
return;
}
CHECK(framework->state == Framework::RUNNING) << framework->state;
// Either send the task to an executor or start a new executor
// and queue the task until the executor has started.
Executor* executor = framework->getExecutor(executorId);
if (executor == NULL) {
executor = framework->launchExecutor(executorInfo, task);
}
CHECK_NOTNULL(executor);
switch (executor->state) {
case Executor::TERMINATING:
case Executor::TERMINATED: {
LOG(WARNING) << "Asked to run task '" << task.task_id()
<< "' for framework " << frameworkId
<< " with executor '" << executorId
<< "' which is terminating/terminated";
const StatusUpdate update = protobuf::createStatusUpdate(
frameworkId,
info.id(),
task.task_id(),
TASK_LOST,
TaskStatus::SOURCE_SLAVE,
UUID::random(),
"Executor terminating/terminated",
TaskStatus::REASON_EXECUTOR_TERMINATED);
statusUpdate(update, UPID());
break;
}
case Executor::REGISTERING:
// Checkpoint the task before we do anything else.
if (executor->checkpoint) {
executor->checkpointTask(task);
}
// Queue task if the executor has not yet registered.
LOG(INFO) << "Queuing task '" << task.task_id()
<< "' for executor " << *executor;
executor->queuedTasks[task.task_id()] = task;
break;
case Executor::RUNNING: {
// Checkpoint the task before we do anything else.
if (executor->checkpoint) {
executor->checkpointTask(task);
}
// Queue task until the containerizer is updated with new
// resource limits (MESOS-998).
LOG(INFO) << "Queuing task '" << task.task_id()
<< "' for executor " << *executor;
executor->queuedTasks[task.task_id()] = task;
// Update the resource limits for the container. Note that the
// resource limits include the currently queued tasks because we
// want the container to have enough resources to hold the
// upcoming tasks.
Resources resources = executor->resources;
// TODO(jieyu): Use foreachvalue instead once LinkedHashmap
// supports it.
foreach (const TaskInfo& task, executor->queuedTasks.values()) {
resources += task.resources();
}
containerizer->update(executor->containerId, resources)
.onAny(defer(self(),
&Self::runTasks,
lambda::_1,
frameworkId,
executorId,
executor->containerId,
list<TaskInfo>({task})));
break;
}
default:
LOG(FATAL) << "Executor " << *executor << " is in unexpected state "
<< executor->state;
break;
}
// We don't perform the checks for 'removeFramework' here since
// we're guaranteed by 'launchExecutor' that 'framework->executors'
// will be non-empty.
CHECK(!framework->executors.empty());
}
void Slave::runTasks(
const Future<Nothing>& future,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const ContainerID& containerId,
const list<TaskInfo>& tasks)
{
// Store all task IDs for logging.
vector<TaskID> taskIds;
foreach (const TaskInfo& task, tasks) {
taskIds.push_back(task.task_id());
}
if (!future.isReady()) {
LOG(ERROR) << "Failed to update resources for container " << containerId
<< " of executor '" << executorId
<< "' of framework " << frameworkId
<< ", destroying container: "
<< (future.isFailed() ? future.failure() : "discarded");
containerizer->destroy(containerId);
Executor* executor = getExecutor(frameworkId, executorId);
if (executor != NULL) {
containerizer::Termination termination;
termination.set_state(TASK_LOST);
termination.add_reasons(TaskStatus::REASON_CONTAINER_UPDATE_FAILED);
termination.set_message(
"Failed to update resources for container: " +
(future.isFailed() ? future.failure() : "discarded"));
executor->pendingTermination = termination;
// TODO(jieyu): Set executor->state to be TERMINATING.
}
return;
}
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING) << "Ignoring sending queued tasks " << taskIds
<< " to executor '" << executorId
<< "' of framework " << frameworkId
<< " because the framework does not exist";
return;
}
// No need to send the task to the executor because the framework is
// being shutdown. No need to send status update for the task as
// well because the framework is terminating!
if (framework->state == Framework::TERMINATING) {
LOG(WARNING) << "Ignoring sending queued tasks " << taskIds
<< " to executor '" << executorId
<< "' of framework " << frameworkId
<< " because the framework is terminating";
return;
}
Executor* executor = framework->getExecutor(executorId);
if (executor == NULL) {
LOG(WARNING) << "Ignoring sending queued tasks " << taskIds
<< " to executor '" << executorId
<< "' of framework " << frameworkId
<< " because the executor does not exist";
return;
}
// This is the case where the original instance of the executor has
// been shutdown and a new instance is brought up. No need to send
// status update as well because it should have already been sent
// when the original instance of the executor was shutting down.
if (executor->containerId != containerId) {
LOG(WARNING) << "Ignoring sending queued tasks '" << taskIds
<< " to executor " << *executor
<< " because the target container " << containerId
<< " has exited";
return;
}
CHECK(executor->state == Executor::RUNNING ||
executor->state == Executor::TERMINATING ||
executor->state == Executor::TERMINATED)
<< executor->state;
// No need to send the task to the executor because the executor is
// terminating or has been terminated. No need to send status update
// for the task as well because it will be properly handled by
// 'executorTerminated'.
if (executor->state != Executor::RUNNING) {
LOG(WARNING) << "Ignoring sending queued tasks " << taskIds
<< " to executor " << *executor
<< " because the executor is in "
<< executor->state << " state";
return;
}
foreach (const TaskInfo& task, tasks) {
// This is the case where the task is killed. No need to send
// status update because it should be handled in 'killTask'.
if (!executor->queuedTasks.contains(task.task_id())) {
LOG(WARNING) << "Ignoring sending queued task '" << task.task_id()
<< "' to executor " << *executor
<< " because the task has been killed";
continue;
}
executor->queuedTasks.erase(task.task_id());
// Add the task and send it to the executor.
executor->addTask(task);
LOG(INFO) << "Sending queued task '" << task.task_id()
<< "' to executor " << *executor;
RunTaskMessage message;
message.mutable_framework()->MergeFrom(framework->info);
message.mutable_task()->MergeFrom(task);
// Note that 0.23.x executors require the 'pid' to be set
// to decode the message, but do not use the field.
message.set_pid(framework->pid.getOrElse(UPID()));
executor->send(message);
}
}
void Slave::killTask(
const UPID& from,
const FrameworkID& frameworkId,
const TaskID& taskId)
{
if (master != from) {
LOG(WARNING) << "Ignoring kill task message from " << from
<< " because it is not the expected master: "
<< (master.isSome() ? stringify(master.get()) : "None");
return;
}
LOG(INFO) << "Asked to kill task " << taskId
<< " of framework " << frameworkId;
CHECK(state == RECOVERING || state == DISCONNECTED ||
state == RUNNING || state == TERMINATING)
<< state;
// TODO(bmahler): Also ignore if we're DISCONNECTED.
if (state == RECOVERING || state == TERMINATING) {
LOG(WARNING) << "Cannot kill task " << taskId
<< " of framework " << frameworkId
<< " because the slave is " << state;
// TODO(vinod): Consider sending a TASK_LOST here.
// Currently it is tricky because 'statusUpdate()'
// ignores updates for unknown frameworks.
return;
}
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING) << "Ignoring kill task " << taskId
<< " of framework " << frameworkId
<< " because no such framework is running";
return;
}
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING)
<< framework->state;
// We don't send a status update here because a terminating
// framework cannot send acknowledgements.
if (framework->state == Framework::TERMINATING) {
LOG(WARNING) << "Ignoring kill task " << taskId
<< " of framework " << frameworkId
<< " because the framework is terminating";
return;
}
foreachkey (const ExecutorID& executorId, framework->pending) {
if (framework->pending[executorId].contains(taskId)) {
LOG(WARNING) << "Killing task " << taskId
<< " of framework " << frameworkId
<< " before it was launched";
const StatusUpdate update = protobuf::createStatusUpdate(
frameworkId,
info.id(),
taskId,
TASK_KILLED,
TaskStatus::SOURCE_SLAVE,
UUID::random(),
"Task killed before it was launched");
statusUpdate(update, UPID());
framework->pending[executorId].erase(taskId);
if (framework->pending[executorId].empty()) {
framework->pending.erase(executorId);
if (framework->pending.empty() && framework->executors.empty()) {
removeFramework(framework);
}
}
return;
}
}
Executor* executor = framework->getExecutor(taskId);
if (executor == NULL) {
LOG(WARNING) << "Cannot kill task " << taskId
<< " of framework " << frameworkId
<< " because no corresponding executor is running";
// We send a TASK_LOST update because this task has never
// been launched on this slave.
const StatusUpdate update = protobuf::createStatusUpdate(
frameworkId,
info.id(),
taskId,
TASK_LOST,
TaskStatus::SOURCE_SLAVE,
UUID::random(),
"Cannot find executor",
TaskStatus::REASON_EXECUTOR_TERMINATED);
statusUpdate(update, UPID());
return;
}
switch (executor->state) {
case Executor::REGISTERING: {
// The executor hasn't registered yet.
const StatusUpdate update = protobuf::createStatusUpdate(
frameworkId,
info.id(),
taskId,
TASK_KILLED,
TaskStatus::SOURCE_SLAVE,
UUID::random(),
"Unregistered executor",
TaskStatus::REASON_EXECUTOR_UNREGISTERED,
executor->id);
// NOTE: Sending a terminal update (TASK_KILLED) removes the
// task from 'executor->queuedTasks', so that if the executor
// registers at a later point in time, it won't get this task.
statusUpdate(update, UPID());
// TODO(jieyu): Here, we kill the executor if it no longer has
// any task to run and has not yet registered. This is a
// workaround for those single task executors that do not have a
// proper self terminating logic when they haven't received the
// task within a timeout.
if (executor->queuedTasks.empty()) {
CHECK(executor->launchedTasks.empty())
<< " Unregistered executor '" << executor->id
<< "' has launched tasks";
LOG(WARNING) << "Killing the unregistered executor " << *executor
<< " because it has no tasks";
executor->state = Executor::TERMINATING;
containerizer->destroy(executor->containerId);
}
break;
}
case Executor::TERMINATING:
case Executor::TERMINATED:
LOG(WARNING) << "Ignoring kill task " << taskId
<< " because the executor " << *executor
<< " is terminating/terminated";
break;
case Executor::RUNNING: {
if (executor->queuedTasks.contains(taskId)) {
// This is the case where the task has not yet been sent to
// the executor (e.g., waiting for containerizer update to
// finish).
const StatusUpdate update = protobuf::createStatusUpdate(
frameworkId,
info.id(),
taskId,
TASK_KILLED,
TaskStatus::SOURCE_SLAVE,
UUID::random(),
"Task killed when it was queued",
None(),
executor->id);
// NOTE: Sending a terminal update (TASK_KILLED) removes the
// task from 'executor->queuedTasks'.
statusUpdate(update, UPID());
} else {
// Send a message to the executor and wait for
// it to send us a status update.
KillTaskMessage message;
message.mutable_framework_id()->MergeFrom(frameworkId);
message.mutable_task_id()->MergeFrom(taskId);
executor->send(message);
}
break;
}
default:
LOG(FATAL) << "Executor " << *executor << " is in unexpected state "
<< executor->state;
break;
}
}
// TODO(benh): Consider sending a boolean that specifies if the
// shut down should be graceful or immediate. Likewise, consider
// sending back a shut down acknowledgement, because otherwise you
// could get into a state where a shut down was sent, dropped, and
// therefore never processed.
void Slave::shutdownFramework(
const UPID& from,
const FrameworkID& frameworkId)
{
// Allow shutdownFramework() only if
// its called directly (e.g. Slave::finalize()) or
// its a message from the currently registered master.
if (from && master != from) {
LOG(WARNING) << "Ignoring shutdown framework message for " << frameworkId
<< " from " << from
<< " because it is not from the registered master ("
<< (master.isSome() ? stringify(master.get()) : "None") << ")";
return;
}
LOG(INFO) << "Asked to shut down framework " << frameworkId
<< " by " << from;
CHECK(state == RECOVERING || state == DISCONNECTED ||
state == RUNNING || state == TERMINATING)
<< state;
if (state == RECOVERING || state == DISCONNECTED) {
LOG(WARNING) << "Ignoring shutdown framework message for " << frameworkId
<< " because the slave has not yet registered with the master";
return;
}
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING) << "Cannot shut down unknown framework " << frameworkId;
return;
}
switch (framework->state) {
case Framework::TERMINATING:
LOG(WARNING) << "Ignoring shutdown framework " << framework->id()
<< " because it is terminating";
break;
case Framework::RUNNING:
LOG(INFO) << "Shutting down framework " << framework->id();
framework->state = Framework::TERMINATING;
// Shut down all executors of this framework.
// NOTE: We use 'executors.keys()' here because 'shutdownExecutor'
// and 'removeExecutor' can remove an executor from 'executors'.
foreach (const ExecutorID& executorId, framework->executors.keys()) {
Executor* executor = framework->executors[executorId];
CHECK(executor->state == Executor::REGISTERING ||
executor->state == Executor::RUNNING ||
executor->state == Executor::TERMINATING ||
executor->state == Executor::TERMINATED)
<< executor->state;
if (executor->state == Executor::REGISTERING ||
executor->state == Executor::RUNNING) {
_shutdownExecutor(framework, executor);
} else if (executor->state == Executor::TERMINATED) {
// NOTE: We call remove here to ensure we can remove an
// executor (of a terminating framework) that is terminated
// but waiting for acknowledgements.
removeExecutor(framework, executor);
} else {
// Executor is terminating. Ignore.
}
}
// Remove this framework if it has no pending executors and tasks.
if (framework->executors.empty() && framework->pending.empty()) {
removeFramework(framework);
}
break;
default:
LOG(FATAL) << "Framework " << frameworkId
<< " is in unexpected state " << framework->state;
break;
}
}
void Slave::schedulerMessage(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const string& data)
{
CHECK(state == RECOVERING || state == DISCONNECTED ||
state == RUNNING || state == TERMINATING)
<< state;
if (state != RUNNING) {
LOG(WARNING) << "Dropping message from framework " << frameworkId
<< " because the slave is in " << state << " state";
metrics.invalid_framework_messages++;
return;
}
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING) << "Dropping message from framework " << frameworkId
<< " because framework does not exist";
metrics.invalid_framework_messages++;
return;
}
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING)
<< framework->state;
if (framework->state == Framework::TERMINATING) {
LOG(WARNING) << "Dropping message from framework " << frameworkId
<< " because framework is terminating";
metrics.invalid_framework_messages++;
return;
}
Executor* executor = framework->getExecutor(executorId);
if (executor == NULL) {
LOG(WARNING) << "Dropping message for executor " << executorId
<< " because executor does not exist";
metrics.invalid_framework_messages++;
return;
}
switch (executor->state) {
case Executor::REGISTERING:
case Executor::TERMINATING:
case Executor::TERMINATED:
// TODO(*): If executor is not yet registered, queue framework
// message? It's probably okay to just drop it since frameworks
// can have the executor send a message to the master to say when
// it's ready.
LOG(WARNING) << "Dropping message for executor " << *executor
<< " because executor is not running";
metrics.invalid_framework_messages++;
break;
case Executor::RUNNING: {
FrameworkToExecutorMessage message;
message.mutable_slave_id()->MergeFrom(slaveId);
message.mutable_framework_id()->MergeFrom(frameworkId);
message.mutable_executor_id()->MergeFrom(executorId);
message.set_data(data);
executor->send(message);
metrics.valid_framework_messages++;
break;
}
default:
LOG(FATAL) << "Executor " << *executor << " is in unexpected state "
<< executor->state;
break;
}
}
void Slave::updateFramework(
const FrameworkID& frameworkId,
const UPID& pid)
{
CHECK(state == RECOVERING || state == DISCONNECTED ||
state == RUNNING || state == TERMINATING)
<< state;
if (state != RUNNING) {
LOG(WARNING) << "Dropping updateFramework message for " << frameworkId
<< " because the slave is in " << state << " state";
metrics.invalid_framework_messages++;
return;
}
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING) << "Ignoring updating pid for framework " << frameworkId
<< " because it does not exist";
return;
}
switch (framework->state) {
case Framework::TERMINATING:
LOG(WARNING) << "Ignoring updating pid for framework " << frameworkId
<< " because it is terminating";
break;
case Framework::RUNNING: {
LOG(INFO) << "Updating framework " << frameworkId << " pid to " << pid;
if (pid == UPID()) {
framework->pid = None();
} else {
framework->pid = pid;
}
if (framework->info.checkpoint()) {
// Checkpoint the framework pid, note that when the 'pid'
// is None, we checkpoint a default UPID() because
// 0.23.x slaves consider a missing pid file to be an
// error.
const string path = paths::getFrameworkPidPath(
metaDir, info.id(), frameworkId);
VLOG(1) << "Checkpointing framework pid"
<< " '" << framework->pid.getOrElse(UPID()) << "'"
<< " to '" << path << "'";
CHECK_SOME(state::checkpoint(path, framework->pid.getOrElse(UPID())));
}
// Inform status update manager to immediately resend any pending
// updates.
statusUpdateManager->resume();
break;
}
default:
LOG(FATAL) << "Framework " << framework->id()
<< " is in unexpected state " << framework->state;
break;
}
}
void Slave::checkpointResources(const vector<Resource>& _checkpointedResources)
{
// TODO(jieyu): Here we assume that CheckpointResourcesMessages are
// ordered (i.e., slave receives them in the same order master sends
// them). This should be true in most of the cases because TCP
// enforces in order delivery per connection. However, the ordering
// is technically not guaranteed because master creates multiple
// connections to the slave in some cases (e.g., persistent socket
// to slave breaks and master uses ephemeral socket). This could
// potentially be solved by using a version number and rejecting
// stale messages according to the version number.
//
// If CheckpointResourcesMessages are delivered out-of-order, there
// are two cases to consider:
// (1) If master does not fail over, it will reconcile the state
// with the slave if the framework later changes the
// checkpointed resources. Since master is the source of truth
// for reservations, the inconsistency is not exposed to
// frameworks.
// (2) If master does fail over, the slave will inform the new
// master about the incorrect checkpointed resources. When that
// happens, we expect framework to reconcile based on the
// offers they get.
Resources newCheckpointedResources = _checkpointedResources;
CHECK_SOME(state::checkpoint(
paths::getResourcesInfoPath(metaDir),
newCheckpointedResources))
<< "Failed to checkpoint resources " << newCheckpointedResources;
// Creates persistent volumes that do not exist and schedules
// releasing those persistent volumes that are no longer needed.
//
// TODO(jieyu): Consider introducing a volume manager once we start
// to support multiple disks, or raw disks. Depending on the
// DiskInfo, we may want to create either directories under a root
// directory, or LVM volumes from a given device.
Resources volumes = newCheckpointedResources.persistentVolumes();
foreach (const Resource& volume, volumes) {
// This is validated in master.
CHECK_NE(volume.role(), "*");
string path = paths::getPersistentVolumePath(flags.work_dir, volume);
if (!os::exists(path)) {
CHECK_SOME(os::mkdir(path, true))
<< "Failed to create persistent volume at '" << path << "'";
}
}
// TODO(jieyu): Schedule gc for released persistent volumes. We need
// to consider dynamic reservation here because the framework can
// release dynamic reservation while still wants to keep the
// persistent volume.
LOG(INFO) << "Updated checkpointed resources from "
<< checkpointedResources << " to "
<< newCheckpointedResources;
checkpointedResources = newCheckpointedResources;
}
void Slave::statusUpdateAcknowledgement(
const UPID& from,
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const TaskID& taskId,
const string& uuid)
{
// Originally, all status update acknowledgements were sent from the
// scheduler driver. We'd like to have all acknowledgements sent by
// the master instead. See: MESOS-1389.
// For now, we handle acknowledgements from the leading master and
// from the scheduler driver, for backwards compatibility.
// TODO(bmahler): Aim to have the scheduler driver no longer
// sending acknowledgements in 0.20.0. Stop handling those messages
// here in 0.21.0.
// NOTE: We must reject those acknowledgements coming from
// non-leading masters because we may have already sent the terminal
// un-acknowledged task to the leading master! Unfortunately, the
// master's pid will not change across runs on the same machine, so
// we may process a message from the old master on the same machine,
// but this is a more general problem!
if (strings::startsWith(from.id, "master")) {
if (state != RUNNING) {
LOG(WARNING) << "Dropping status update acknowledgement message for "
<< frameworkId << " because the slave is in "
<< state << " state";
return;
}
if (master != from) {
LOG(WARNING) << "Ignoring status update acknowledgement message from "
<< from << " because it is not the expected master: "
<< (master.isSome() ? stringify(master.get()) : "None");
return;
}
}
statusUpdateManager->acknowledgement(
taskId, frameworkId, UUID::fromBytes(uuid))
.onAny(defer(self(),
&Slave::_statusUpdateAcknowledgement,
lambda::_1,
taskId,
frameworkId,
UUID::fromBytes(uuid)));
}
void Slave::_statusUpdateAcknowledgement(
const Future<bool>& future,
const TaskID& taskId,
const FrameworkID& frameworkId,
const UUID& uuid)
{
// The future could fail if this is a duplicate status update acknowledgement.
if (!future.isReady()) {
LOG(ERROR) << "Failed to handle status update acknowledgement (UUID: "
<< uuid << ") for task " << taskId
<< " of framework " << frameworkId << ": "
<< (future.isFailed() ? future.failure() : "future discarded");
return;
}
VLOG(1) << "Status update manager successfully handled status update"
<< " acknowledgement (UUID: " << uuid
<< ") for task " << taskId
<< " of framework " << frameworkId;
CHECK(state == RECOVERING || state == DISCONNECTED ||
state == RUNNING || state == TERMINATING)
<< state;
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(ERROR) << "Status update acknowledgement (UUID: " << uuid
<< ") for task " << taskId
<< " of unknown framework " << frameworkId;
return;
}
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING)
<< framework->state;
// Find the executor that has this update.
Executor* executor = framework->getExecutor(taskId);
if (executor == NULL) {
LOG(ERROR) << "Status update acknowledgement (UUID: " << uuid
<< ") for task " << taskId
<< " of unknown executor";
return;
}
CHECK(executor->state == Executor::REGISTERING ||
executor->state == Executor::RUNNING ||
executor->state == Executor::TERMINATING ||
executor->state == Executor::TERMINATED)
<< executor->state;
// If the task has reached terminal state and all its updates have
// been acknowledged, mark it completed.
if (executor->terminatedTasks.contains(taskId) && !future.get()) {
executor->completeTask(taskId);
}
// Remove the executor if it has terminated and there are no more
// incomplete tasks.
if (executor->state == Executor::TERMINATED && !executor->incompleteTasks()) {
removeExecutor(framework, executor);
}
// Remove this framework if it has no pending executors and tasks.
if (framework->executors.empty() && framework->pending.empty()) {
removeFramework(framework);
}
}
void Slave::subscribe(
HttpConnection http,
const Call::Subscribe& subscribe,
Framework* framework,
Executor* executor)
{
CHECK_NOTNULL(framework);
CHECK_NOTNULL(executor);
LOG(INFO) << "Received Subscribe request for HTTP executor " << *executor;
CHECK(state == RECOVERING || state == DISCONNECTED ||
state == RUNNING || state == TERMINATING)
<< state;
if (state == TERMINATING) {
LOG(WARNING) << "Shutting down executor " << *executor << " as the slave "
<< "is terminating";
http.send(ShutdownExecutorMessage());
http.close();
return;
}
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING)
<< framework->state;
if (framework->state == Framework::TERMINATING) {
LOG(WARNING) << "Shutting down executor " << *executor << " as the "
<< "framework is terminating";
http.send(ShutdownExecutorMessage());
http.close();
return;
}
switch (executor->state) {
case Executor::TERMINATING:
case Executor::TERMINATED:
// TERMINATED is possible if the executor forks, the parent process
// terminates and the child process (driver) tries to register!
LOG(WARNING) << "Shutting down executor " << *executor
<< " because it is in unexpected state " << executor->state;
http.send(ShutdownExecutorMessage());
http.close();
break;
case Executor::RUNNING:
case Executor::REGISTERING: {
// Close the earlier connection if one existed. This can even
// be a retried Subscribe request from an already connected
// executor.
if (executor->http.isSome()) {
LOG(WARNING) << "Closing already existing HTTP connection from "
<< "executor " << *executor;
executor->http->close();
}
executor->state = Executor::RUNNING;
// Save the connection for the executor.
executor->http = http;
executor->pid = None();
if (framework->info.checkpoint()) {
// Write a marker file to indicate that this executor
// is HTTP based.
const string path = paths::getExecutorHttpMarkerPath(
metaDir,
info.id(),
framework->id(),
executor->id,
executor->containerId);
LOG(INFO) << "Creating a marker file for HTTP based executor "
<< *executor << " at path '" << path << "'";
CHECK_SOME(os::touch(path));
}
// Tell executor it's registered and give it any queued tasks.
ExecutorRegisteredMessage message;
message.mutable_executor_info()->MergeFrom(executor->info);
message.mutable_framework_id()->MergeFrom(framework->id());
message.mutable_framework_info()->MergeFrom(framework->info);
message.mutable_slave_id()->MergeFrom(info.id());
message.mutable_slave_info()->MergeFrom(info);
executor->send(message);
// Handle all the pending updates.
// The status update manager might have already checkpointed some
// of these pending updates (for example, if the slave died right
// after it checkpointed the update but before it could send the
// ACK to the executor). This is ok because the status update
// manager correctly handles duplicate updates.
foreach (const Call::Update& update, subscribe.unacknowledged_updates()) {
// NOTE: This also updates the executor's resources!
statusUpdate(protobuf::createStatusUpdate(
framework->id(),
update.status(),
info.id()),
None());
}
// Update the resource limits for the container. Note that the
// resource limits include the currently queued tasks because we
// want the container to have enough resources to hold the
// upcoming tasks.
Resources resources = executor->resources;
// TODO(jieyu): Use foreachvalue instead once LinkedHashmap
// supports it.
foreach (const TaskInfo& task, executor->queuedTasks.values()) {
resources += task.resources();
}
containerizer->update(executor->containerId, resources)
.onAny(defer(self(),
&Self::runTasks,
lambda::_1,
framework->id(),
executor->id,
executor->containerId,
executor->queuedTasks.values()));
hashmap<TaskID, TaskInfo> unackedTasks;
foreach (const TaskInfo& task, subscribe.unacknowledged_tasks()) {
unackedTasks[task.task_id()] = task;
}
// Now, if there is any task still in STAGING state and not in
// unacknowledged 'tasks' known to the executor, the slave must
// have died before the executor received the task! We should
// transition it to TASK_LOST. We only consider/store
// unacknowledged 'tasks' at the executor driver because if a
// task has been acknowledged, the slave must have received
// an update for that task and transitioned it out of STAGING!
// TODO(vinod): Consider checkpointing 'TaskInfo' instead of
// 'Task' so that we can relaunch such tasks! Currently we
// don't do it because 'TaskInfo.data' could be huge.
// TODO(vinod): Use foreachvalue instead once LinkedHashmap
// supports it.
foreach (Task* task, executor->launchedTasks.values()) {
if (task->state() == TASK_STAGING &&
!unackedTasks.contains(task->task_id())) {
LOG(INFO) << "Transitioning STAGED task " << task->task_id()
<< " to LOST because it is unknown to the executor "
<< executor->id;
const StatusUpdate update = protobuf::createStatusUpdate(
framework->id(),
info.id(),
task->task_id(),
TASK_LOST,
TaskStatus::SOURCE_SLAVE,
UUID::random(),
"Task launched during slave restart",
TaskStatus::REASON_SLAVE_RESTARTED,
executor->id);
statusUpdate(update, UPID());
}
}
break;
}
default:
LOG(FATAL) << "Executor " << *executor << " is in unexpected state "
<< executor->state;
break;
}
}
void Slave::registerExecutor(
const UPID& from,
const FrameworkID& frameworkId,
const ExecutorID& executorId)
{
LOG(INFO) << "Got registration for executor '" << executorId
<< "' of framework " << frameworkId << " from "
<< stringify(from);
CHECK(state == RECOVERING || state == DISCONNECTED ||
state == RUNNING || state == TERMINATING)
<< state;
if (state == RECOVERING) {
LOG(WARNING) << "Shutting down executor '" << executorId
<< "' of framework " << frameworkId
<< " because the slave is still recovering";
reply(ShutdownExecutorMessage());
return;
}
if (state == TERMINATING) {
LOG(WARNING) << "Shutting down executor '" << executorId
<< "' of framework " << frameworkId
<< " because the slave is terminating";
reply(ShutdownExecutorMessage());
return;
}
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING) << "Shutting down executor '" << executorId
<< "' as the framework " << frameworkId
<< " does not exist";
reply(ShutdownExecutorMessage());
return;
}
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING)
<< framework->state;
if (framework->state == Framework::TERMINATING) {
LOG(WARNING) << "Shutting down executor '" << executorId
<< "' as the framework " << frameworkId
<< " is terminating";
reply(ShutdownExecutorMessage());
return;
}
Executor* executor = framework->getExecutor(executorId);
// Check the status of the executor.
if (executor == NULL) {
LOG(WARNING) << "Unexpected executor '" << executorId
<< "' registering for framework " << frameworkId;
reply(ShutdownExecutorMessage());
return;
}
switch (executor->state) {
case Executor::TERMINATING:
case Executor::TERMINATED:
// TERMINATED is possible if the executor forks, the parent process
// terminates and the child process (driver) tries to register!
case Executor::RUNNING:
LOG(WARNING) << "Shutting down executor " << *executor
<< " because it is in unexpected state " << executor->state;
reply(ShutdownExecutorMessage());
break;
case Executor::REGISTERING: {
executor->state = Executor::RUNNING;
// Save the pid for the executor.
executor->pid = from;
link(from);
if (framework->info.checkpoint()) {
// TODO(vinod): This checkpointing should be done
// asynchronously as it is in the fast path of the slave!
// Checkpoint the libprocess pid.
string path = paths::getLibprocessPidPath(
metaDir,
info.id(),
executor->frameworkId,
executor->id,
executor->containerId);
VLOG(1) << "Checkpointing executor pid '"
<< executor->pid.get() << "' to '" << path << "'";
CHECK_SOME(state::checkpoint(path, executor->pid.get()));
}
// Tell executor it's registered and give it any queued tasks.
ExecutorRegisteredMessage message;
message.mutable_executor_info()->MergeFrom(executor->info);
message.mutable_framework_id()->MergeFrom(framework->id());
message.mutable_framework_info()->MergeFrom(framework->info);
message.mutable_slave_id()->MergeFrom(info.id());
message.mutable_slave_info()->MergeFrom(info);
executor->send(message);
// Update the resource limits for the container. Note that the
// resource limits include the currently queued tasks because we
// want the container to have enough resources to hold the
// upcoming tasks.
Resources resources = executor->resources;
// TODO(jieyu): Use foreachvalue instead once LinkedHashmap
// supports it.
foreach (const TaskInfo& task, executor->queuedTasks.values()) {
resources += task.resources();
}
containerizer->update(executor->containerId, resources)
.onAny(defer(self(),
&Self::runTasks,
lambda::_1,
frameworkId,
executorId,
executor->containerId,
executor->queuedTasks.values()));
break;
}
default:
LOG(FATAL) << "Executor " << *executor << " is in unexpected state "
<< executor->state;
break;
}
}
void Slave::reregisterExecutor(
const UPID& from,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const vector<TaskInfo>& tasks,
const vector<StatusUpdate>& updates)
{
CHECK(state == RECOVERING || state == DISCONNECTED ||
state == RUNNING || state == TERMINATING)
<< state;
if (state != RECOVERING) {
LOG(WARNING) << "Shutting down executor '" << executorId
<< "' of framework " << frameworkId
<< " because the slave is not in recovery mode";
reply(ShutdownExecutorMessage());
return;
}
LOG(INFO) << "Re-registering executor '" << executorId
<< "' of framework " << frameworkId;
CHECK(frameworks.contains(frameworkId))
<< "Unknown framework " << frameworkId;
Framework* framework = frameworks[frameworkId];
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING)
<< framework->state;
if (framework->state == Framework::TERMINATING) {
LOG(WARNING) << "Shutting down executor '" << executorId
<< "' as the framework " << frameworkId
<< " is terminating";
reply(ShutdownExecutorMessage());
return;
}
Executor* executor = framework->getExecutor(executorId);
CHECK_NOTNULL(executor);
switch (executor->state) {
case Executor::TERMINATING:
case Executor::TERMINATED:
// TERMINATED is possible if the executor forks, the parent process
// terminates and the child process (driver) tries to register!
case Executor::RUNNING:
LOG(WARNING) << "Shutting down executor " << *executor
<< " because it is in unexpected state " << executor->state;
reply(ShutdownExecutorMessage());
break;
case Executor::REGISTERING: {
executor->state = Executor::RUNNING;
executor->pid = from; // Update the pid.
link(from);
// Send re-registration message to the executor.
ExecutorReregisteredMessage message;
message.mutable_slave_id()->MergeFrom(info.id());
message.mutable_slave_info()->MergeFrom(info);
send(executor->pid.get(), message);
// Handle all the pending updates.
// The status update manager might have already checkpointed some
// of these pending updates (for example, if the slave died right
// after it checkpointed the update but before it could send the
// ACK to the executor). This is ok because the status update
// manager correctly handles duplicate updates.
foreach (const StatusUpdate& update, updates) {
// NOTE: This also updates the executor's resources!
statusUpdate(update, executor->pid.get());
}
// Tell the containerizer to update the resources.
containerizer->update(executor->containerId, executor->resources)
.onAny(defer(self(),
&Self::_reregisterExecutor,
lambda::_1,
frameworkId,
executorId,
executor->containerId));
hashmap<TaskID, TaskInfo> unackedTasks;
foreach (const TaskInfo& task, tasks) {
unackedTasks[task.task_id()] = task;
}
// Now, if there is any task still in STAGING state and not in
// unacknowledged 'tasks' known to the executor, the slave must
// have died before the executor received the task! We should
// transition it to TASK_LOST. We only consider/store
// unacknowledged 'tasks' at the executor driver because if a
// task has been acknowledged, the slave must have received
// an update for that task and transitioned it out of STAGING!
// TODO(vinod): Consider checkpointing 'TaskInfo' instead of
// 'Task' so that we can relaunch such tasks! Currently we
// don't do it because 'TaskInfo.data' could be huge.
// TODO(vinod): Use foreachvalue instead once LinkedHashmap
// supports it.
foreach (Task* task, executor->launchedTasks.values()) {
if (task->state() == TASK_STAGING &&
!unackedTasks.contains(task->task_id())) {
LOG(INFO) << "Transitioning STAGED task " << task->task_id()
<< " to LOST because it is unknown to the executor '"
<< executorId << "'";
const StatusUpdate update = protobuf::createStatusUpdate(
frameworkId,
info.id(),
task->task_id(),
TASK_LOST,
TaskStatus::SOURCE_SLAVE,
UUID::random(),
"Task launched during slave restart",
TaskStatus::REASON_SLAVE_RESTARTED,
executorId);
statusUpdate(update, UPID());
}
}
break;
}
default:
LOG(FATAL) << "Executor " << *executor << " is in unexpected state "
<< executor->state;
break;
}
}
void Slave::_reregisterExecutor(
const Future<Nothing>& future,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const ContainerID& containerId)
{
if (!future.isReady()) {
LOG(ERROR) << "Failed to update resources for container " << containerId
<< " of executor '" << executorId
<< "' of framework " << frameworkId
<< ", destroying container: "
<< (future.isFailed() ? future.failure() : "discarded");
containerizer->destroy(containerId);
Executor* executor = getExecutor(frameworkId, executorId);
if (executor != NULL) {
containerizer::Termination termination;
termination.set_state(TASK_LOST);
termination.add_reasons(TaskStatus::REASON_CONTAINER_UPDATE_FAILED);
termination.set_message(
"Failed to update resources for container: " +
(future.isFailed() ? future.failure() : "discarded"));
executor->pendingTermination = termination;
// TODO(jieyu): Set executor->state to be TERMINATING.
}
}
}
void Slave::reregisterExecutorTimeout()
{
CHECK(state == RECOVERING || state == TERMINATING) << state;
LOG(INFO) << "Cleaning up un-reregistered executors";
foreachvalue (Framework* framework, frameworks) {
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING)
<< framework->state;
foreachvalue (Executor* executor, framework->executors) {
switch (executor->state) {
case Executor::RUNNING: // Executor re-registered.
case Executor::TERMINATING:
case Executor::TERMINATED:
break;
case Executor::REGISTERING: {
// If we are here, the executor must have been hung and not
// exited! This is because if the executor properly exited,
// it should have already been identified by the isolator
// (via the reaper) and cleaned up!
LOG(INFO) << "Killing un-reregistered executor " << *executor;
containerizer->destroy(executor->containerId);
executor->state = Executor::TERMINATING;
containerizer::Termination termination;
termination.set_state(TASK_LOST);
termination.add_reasons(
TaskStatus::REASON_EXECUTOR_REREGISTRATION_TIMEOUT);
termination.set_message(
"Executor did not re-register within " +
stringify(EXECUTOR_REREGISTER_TIMEOUT));
executor->pendingTermination = termination;
break;
}
default:
LOG(FATAL) << "Executor " << *executor << " is in unexpected state "
<< executor->state;
break;
}
}
}
// Signal the end of recovery.
recovered.set(Nothing());
}
// This can be called in two ways:
// 1) When a status update from the executor is received.
// 2) When slave generates task updates (e.g LOST/KILLED/FAILED).
// NOTE: We set the pid in 'Slave::__statusUpdate()' to 'pid' so that
// whoever sent this update will get an ACK. This is important because
// we allow executors to send updates for tasks that belong to other
// executors. Currently we allow this because we cannot guarantee
// reliable delivery of status updates. Since executor driver caches
// unacked updates it is important that whoever sent the update gets
// acknowledgement for it.
void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid)
{
LOG(INFO) << "Handling status update " << update
<< (pid.isSome() ? " from " + stringify(pid.get()) : "");
CHECK(state == RECOVERING || state == DISCONNECTED ||
state == RUNNING || state == TERMINATING)
<< state;
if (!update.has_uuid()) {
LOG(WARNING) << "Ignoring status update " << update << " without 'uuid'";
metrics.invalid_status_updates++;
return;
}
// TODO(bmahler): With the HTTP API, we must validate the UUID
// inside the TaskStatus. For now, we ensure that the uuid of task
// status matches the update's uuid, in case the executor is using
// pre 0.23.x driver.
update.mutable_status()->set_uuid(update.uuid());
// Set the source and UUID before forwarding the status update.
update.mutable_status()->set_source(
pid == UPID() ? TaskStatus::SOURCE_SLAVE : TaskStatus::SOURCE_EXECUTOR);
// Set TaskStatus.executor_id if not already set; overwrite existing
// value if already set.
if (update.has_executor_id()) {
if (update.status().has_executor_id() &&
update.status().executor_id() != update.executor_id()) {
LOG(WARNING) << "Executor ID mismatch in status update"
<< (pid.isSome() ? " from " + stringify(pid.get()) : "")
<< "; overwriting received '"
<< update.status().executor_id() << "' with expected'"
<< update.executor_id() << "'";
}
update.mutable_status()->mutable_executor_id()->CopyFrom(
update.executor_id());
}
Framework* framework = getFramework(update.framework_id());
if (framework == NULL) {
LOG(WARNING) << "Ignoring status update " << update
<< " for unknown framework " << update.framework_id();
metrics.invalid_status_updates++;
return;
}
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING)
<< framework->state;
// We don't send update when a framework is terminating because
// it cannot send acknowledgements.
if (framework->state == Framework::TERMINATING) {
LOG(WARNING) << "Ignoring status update " << update
<< " for terminating framework " << framework->id();
metrics.invalid_status_updates++;
return;
}
if (HookManager::hooksAvailable()) {
// Even though the hook(s) return a TaskStatus, we only use two fields:
// container_status and labels. Remaining fields are discarded.
TaskStatus statusFromHooks =
HookManager::slaveTaskStatusDecorator(
update.framework_id(), update.status());
if (statusFromHooks.has_labels()) {
update.mutable_status()->mutable_labels()->CopyFrom(
statusFromHooks.labels());
}
if (statusFromHooks.has_container_status()) {
update.mutable_status()->mutable_container_status()->CopyFrom(
statusFromHooks.container_status());
}
}
// Fill in the container IP address with the IP from the agent PID, if not
// already filled in.
// TODO(karya): Fill in the IP address by looking up the executor PID.
ContainerStatus* containerStatus =
update.mutable_status()->mutable_container_status();
if (containerStatus->network_infos().size() == 0) {
NetworkInfo* networkInfo = containerStatus->add_network_infos();
// TODO(CD): Deprecated -- Remove after 0.27.0.
networkInfo->set_ip_address(stringify(self().address.ip));
NetworkInfo::IPAddress* ipAddress =
networkInfo->add_ip_addresses();
ipAddress->set_ip_address(stringify(self().address.ip));
}
const TaskStatus& status = update.status();
Executor* executor = framework->getExecutor(status.task_id());
if (executor == NULL) {
LOG(WARNING) << "Could not find the executor for "
<< "status update " << update;
metrics.valid_status_updates++;
// NOTE: We forward the update here because this update could be
// generated by the slave when the executor is unknown to it
// (e.g., killTask(), _runTask()) or sent by an executor for a
// task that belongs to another executor.
// We also end up here if 1) the previous slave died after
// checkpointing a _terminal_ update but before it could send an
// ACK to the executor AND 2) after recovery the status update
// manager successfully retried the update, got the ACK from the
// scheduler and cleaned up the stream before the executor
// re-registered. In this case, the slave cannot find the executor
// corresponding to this task because the task has been moved to
// 'Executor::completedTasks'.
statusUpdateManager->update(update, info.id())
.onAny(defer(self(), &Slave::__statusUpdate, lambda::_1, update, pid));
return;
}
CHECK(executor->state == Executor::REGISTERING ||
executor->state == Executor::RUNNING ||
executor->state == Executor::TERMINATING ||
executor->state == Executor::TERMINATED)
<< executor->state;
// Failing this validation on the executor driver used to cause the
// driver to abort. Now that the validation is done by the slave, it
// should shutdown the executor to be consistent.
//
// TODO(arojas): Once the HTTP API is the default, return a
// 400 Bad Request response, indicating the reason in the body.
if (status.source() == TaskStatus::SOURCE_EXECUTOR &&
status.state() == TASK_STAGING) {
LOG(ERROR) << "Received TASK_STAGING from executor " << *executor
<< " which is not allowed. Shutting down the executor";
_shutdownExecutor(framework, executor);
return;
}
// TODO(vinod): Revisit these semantics when we disallow executors
// from sending updates for tasks that belong to other executors.
if (pid.isSome() &&
pid != UPID() &&
executor->pid.isSome() &&
executor->pid != pid) {
LOG(WARNING) << "Received status update " << update << " from " << pid.get()
<< " on behalf of a different executor '" << executor->id
<< "' (" << executor->pid.get() << ")";
}
metrics.valid_status_updates++;
// We set the latest state of the task here so that the slave can
// inform the master about the latest state (via status update or
// ReregisterSlaveMessage message) as soon as possible. Master can
// use this information, for example, to release resources as soon
// as the latest state of the task reaches a terminal state. This
// is important because status update manager queues updates and
// only sends one update per task at a time; the next update for a
// task is sent only after the acknowledgement for the previous one
// is received, which could take a long time if the framework is
// backed up or is down.
executor->updateTaskState(status);
// Handle the task appropriately if it is terminated.
// TODO(vinod): Revisit these semantics when we disallow duplicate
// terminal updates (e.g., when slave recovery is always enabled).
if (protobuf::isTerminalState(status.state()) &&
(executor->queuedTasks.contains(status.task_id()) ||
executor->launchedTasks.contains(status.task_id()))) {
executor->terminateTask(status.task_id(), status);
// Wait until the container's resources have been updated before
// sending the status update.
containerizer->update(executor->containerId, executor->resources)
.onAny(defer(self(),
&Slave::_statusUpdate,
lambda::_1,
update,
pid,
executor->id,
executor->containerId,
executor->checkpoint));
} else {
// Immediately send the status update.
_statusUpdate(None(),
update,
pid,
executor->id,
executor->containerId,
executor->checkpoint);
}
}
void Slave::_statusUpdate(
const Option<Future<Nothing>>& future,
const StatusUpdate& update,
const Option<UPID>& pid,
const ExecutorID& executorId,
const ContainerID& containerId,
bool checkpoint)
{
if (future.isSome() && !future->isReady()) {
LOG(ERROR) << "Failed to update resources for container " << containerId
<< " of executor '" << executorId
<< "' running task " << update.status().task_id()
<< " on status update for terminal task, destroying container: "
<< (future->isFailed() ? future->failure() : "discarded");
containerizer->destroy(containerId);
Executor* executor = getExecutor(update.framework_id(), executorId);
if (executor != NULL) {
containerizer::Termination termination;
termination.set_state(TASK_LOST);
termination.add_reasons(TaskStatus::REASON_CONTAINER_UPDATE_FAILED);
termination.set_message(
"Failed to update resources for container: " +
(future->isFailed() ? future->failure() : "discarded"));
executor->pendingTermination = termination;
// TODO(jieyu): Set executor->state to be TERMINATING.
}
}
if (checkpoint) {
// Ask the status update manager to checkpoint and reliably send the update.
statusUpdateManager->update(update, info.id(), executorId, containerId)
.onAny(defer(self(), &Slave::__statusUpdate, lambda::_1, update, pid));
} else {
// Ask the status update manager to just retry the update.
statusUpdateManager->update(update, info.id())
.onAny(defer(self(), &Slave::__statusUpdate, lambda::_1, update, pid));
}
}
void Slave::__statusUpdate(
const Future<Nothing>& future,
const StatusUpdate& update,
const Option<UPID>& pid)
{
CHECK_READY(future) << "Failed to handle status update " << update;
VLOG(1) << "Status update manager successfully handled status update "
<< update;
if (pid == UPID()) {
return;
}
StatusUpdateAcknowledgementMessage message;
message.mutable_framework_id()->MergeFrom(update.framework_id());
message.mutable_slave_id()->MergeFrom(update.slave_id());
message.mutable_task_id()->MergeFrom(update.status().task_id());
message.set_uuid(update.uuid());
// Status update manager successfully handled the status update.
// Acknowledge the executor, if we have a valid pid.
if (pid.isSome()) {
LOG(INFO) << "Sending acknowledgement for status update " << update
<< " to " << pid.get();
send(pid.get(), message);
} else {
// Acknowledge the HTTP based executor.
Framework* framework = getFramework(update.framework_id());
if (framework == NULL) {
LOG(WARNING) << "Ignoring sending acknowledgement for status update "
<< update << " of unknown framework";
return;
}
Executor* executor = framework->getExecutor(update.status().task_id());
if (executor == NULL) {
// Refer to the comments in 'statusUpdate()' on when this can
// happen.
LOG(WARNING) << "Ignoring sending acknowledgement for status update "
<< update << " of unknown executor";
return;
}
executor->send(message);
}
}
// NOTE: An acknowledgement for this update might have already been
// processed by the slave but not the status update manager.
void Slave::forward(StatusUpdate update)
{
CHECK(state == RECOVERING || state == DISCONNECTED ||
state == RUNNING || state == TERMINATING)
<< state;
if (state != RUNNING) {
LOG(WARNING) << "Dropping status update " << update
<< " sent by status update manager because the slave"
<< " is in " << state << " state";
return;
}
// Ensure that task status uuid is set even if this update was sent by the
// status update manager after recovering a pre 0.23.x slave/executor driver's
// updates. This allows us to simplify the master code (in >= 0.27.0) to
// assume the uuid is always set for retryable updates.
CHECK(update.has_uuid())
<< "Expecting updates without 'uuid' to have been rejected";
update.mutable_status()->set_uuid(update.uuid());
// Update the status update state of the task and include the latest
// state of the task in the status update.
Framework* framework = getFramework(update.framework_id());
if (framework != NULL) {
const TaskID& taskId = update.status().task_id();
Executor* executor = framework->getExecutor(taskId);
if (executor != NULL) {
// NOTE: We do not look for the task in queued tasks because
// no update is expected for it until it's launched. Similarly,
// we do not look for completed tasks because the state for a
// completed task shouldn't be changed.
Task* task = NULL;
if (executor->launchedTasks.contains(taskId)) {
task = executor->launchedTasks[taskId];
} else if (executor->terminatedTasks.contains(taskId)) {
task = executor->terminatedTasks[taskId];
}
if (task != NULL) {
// We set the status update state of the task here because in
// steady state master updates the status update state of the
// task when it receives this update. If the master fails over,
// slave re-registers with this task in this status update
// state. Note that an acknowledgement for this update might
// be enqueued on status update manager when we are here. But
// that is ok because the status update state will be updated
// when the next update is forwarded to the slave.
task->set_status_update_state(update.status().state());
task->set_status_update_uuid(update.uuid());
// Include the latest state of task in the update. See the
// comments in 'statusUpdate()' on why informing the master
// about the latest state of the task is important.
update.set_latest_state(task->state());
}
}
}
CHECK_SOME(master);
LOG(INFO) << "Forwarding the update " << update << " to " << master.get();
// NOTE: We forward the update even if framework/executor/task
// doesn't exist because the status update manager will be expecting
// an acknowledgement for the update. This could happen for example
// if this is a retried terminal update and before we are here the
// slave has already processed the acknowledgement of the original
// update and removed the framework/executor/task. Also, slave
// re-registration can generate updates when framework/executor/task
// are unknown.
// Forward the update to master.
StatusUpdateMessage message;
message.mutable_update()->MergeFrom(update);
message.set_pid(self()); // The ACK will be first received by the slave.
send(master.get(), message);
}
void Slave::executorMessage(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const string& data)
{
CHECK(state == RECOVERING || state == DISCONNECTED ||
state == RUNNING || state == TERMINATING)
<< state;
if (state != RUNNING) {
LOG(WARNING) << "Dropping framework message from executor '"
<< executorId << "' to framework " << frameworkId
<< " because the slave is in " << state << " state";
metrics.invalid_framework_messages++;
return;
}
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING) << "Cannot send framework message from executor '"
<< executorId << "' to framework " << frameworkId
<< " because framework does not exist";
metrics.invalid_framework_messages++;
return;
}
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING)
<< framework->state;
if (framework->state == Framework::TERMINATING) {
LOG(WARNING) << "Ignoring framework message from executor '"
<< executorId << "' to framework " << frameworkId
<< " because framework is terminating";
metrics.invalid_framework_messages++;
return;
}
ExecutorToFrameworkMessage message;
message.mutable_slave_id()->MergeFrom(slaveId);
message.mutable_framework_id()->MergeFrom(frameworkId);
message.mutable_executor_id()->MergeFrom(executorId);
message.set_data(data);
CHECK_SOME(master);
if (framework->pid.isSome()) {
LOG(INFO) << "Sending message for framework " << frameworkId
<< " to " << framework->pid.get();
send(framework->pid.get(), message);
} else {
LOG(INFO) << "Sending message for framework " << frameworkId
<< " through the master " << master.get();
send(master.get(), message);
}
metrics.valid_framework_messages++;
}
void Slave::ping(const UPID& from, bool connected)
{
VLOG(1) << "Received ping from " << from;
if (!connected && state == RUNNING) {
// This could happen if there is a one way partition between
// the master and slave, causing the master to get an exited
// event and marking the slave disconnected but the slave
// thinking it is still connected. Force a re-registration with
// the master to reconcile.
LOG(INFO) << "Master marked the slave as disconnected but the slave"
<< " considers itself registered! Forcing re-registration.";
detection.discard();
}
// If we don't get a ping from the master, trigger a
// re-registration. This can occur when the master no
// longer considers the slave to be registered, so it is
// essential for the slave to attempt a re-registration
// when this occurs.
Clock::cancel(pingTimer);
pingTimer = delay(
masterPingTimeout,
self(),
&Slave::pingTimeout,
detection);
send(from, PongSlaveMessage());
}
void Slave::pingTimeout(Future<Option<MasterInfo>> future)
{
// It's possible that a new ping arrived since the timeout fired
// and we were unable to cancel this timeout. If this occurs, don't
// bother trying to re-detect.
if (pingTimer.timeout().expired()) {
LOG(INFO) << "No pings from master received within "
<< masterPingTimeout;
future.discard();
}
}
void Slave::exited(const UPID& pid)
{
LOG(INFO) << pid << " exited";
if (master.isNone() || master.get() == pid) {
LOG(WARNING) << "Master disconnected!"
<< " Waiting for a new master to be elected";
// TODO(benh): After so long waiting for a master, commit suicide.
}
}
Framework* Slave::getFramework(const FrameworkID& frameworkId)
{
if (frameworks.count(frameworkId) > 0) {
return frameworks[frameworkId];
}
return NULL;
}
Executor* Slave::getExecutor(
const FrameworkID& frameworkId,
const ExecutorID& executorId)
{
Framework* framework = getFramework(frameworkId);
if (framework != NULL) {
return framework->getExecutor(executorId);
}
return NULL;
}
ExecutorInfo Slave::getExecutorInfo(
const FrameworkInfo& frameworkInfo,
const TaskInfo& task)
{
CHECK_NE(task.has_executor(), task.has_command())
<< "Task " << task.task_id()
<< " should have either CommandInfo or ExecutorInfo set but not both";
if (task.has_command()) {
ExecutorInfo executor;
// Command executors share the same id as the task.
executor.mutable_executor_id()->set_value(task.task_id().value());
executor.mutable_framework_id()->CopyFrom(frameworkInfo.id());
if (task.has_container()) {
// Store the container info in the executor info so it will
// be checkpointed. This allows the correct containerizer to
// recover this task on restart.
executor.mutable_container()->CopyFrom(task.container());
}
bool hasRootfs = task.has_container() &&
task.container().type() == ContainerInfo::MESOS &&
task.container().mesos().has_image();
if (hasRootfs) {
ContainerInfo* container = executor.mutable_container();
// For command-tasks, we are now copying the entire `task.container` into
// the `executorInfo`. Thus, `executor.container` now has the image if
// `task.container` had one. However, in case of rootfs, we want to run
// the command executor in the host filesystem and prepare/mount the image
// into the container as a volume (command executor will use pivot_root to
// mount the image). For this reason, we need to strip the image in
// `executor.container.mesos`.
container->mutable_mesos()->clear_image();
container->set_type(ContainerInfo::MESOS);
Volume* volume = container->add_volumes();
volume->mutable_image()->CopyFrom(task.container().mesos().image());
volume->set_container_path(COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH);
volume->set_mode(Volume::RW);
// We need to set the executor user as root as it needs to
// perform chroot (even when switch_user is set to false).
executor.mutable_command()->set_user("root");
}
// Prepare an executor name which includes information on the
// command being launched.
string name = "(Task: " + task.task_id().value() + ") ";
if (task.command().shell()) {
if (!task.command().has_value()) {
name += "(Command: NO COMMAND)";
} else {
name += "(Command: sh -c '";
if (task.command().value().length() > 15) {
name += task.command().value().substr(0, 12) + "...')";
} else {
name += task.command().value() + "')";
}
}
} else {
if (!task.command().has_value()) {
name += "(Command: NO EXECUTABLE)";
} else {
string args =
task.command().value() + ", " +
strings::join(", ", task.command().arguments());
if (args.length() > 15) {
name += "(Command: [" + args.substr(0, 12) + "...])";
} else {
name += "(Command: [" + args + "])";
}
}
}
executor.set_name("Command Executor " + name);
executor.set_source(task.task_id().value());
// Copy the [uris, environment, container, user] fields from the
// CommandInfo to get the URIs we need to download, the
// environment variables that should get set, the necessary
// container information, and the user to run the executor as but
// nothing else because we need to set up the rest of the executor
// command ourselves in order to invoke 'mesos-executor'.
executor.mutable_command()->mutable_uris()->MergeFrom(
task.command().uris());
if (task.command().has_environment()) {
executor.mutable_command()->mutable_environment()->MergeFrom(
task.command().environment());
}
// We skip setting the user for the command executor that has
// a rootfs image since we need root permissions to chroot.
// We assume command executor will change to the correct user
// later on.
if (!hasRootfs && task.command().has_user()) {
executor.mutable_command()->set_user(task.command().user());
}
Result<string> path =
os::realpath(path::join(flags.launcher_dir, "mesos-executor"));
// Explicitly set 'shell' to true since we want to use the shell
// for running the mesos-executor (and even though this is the
// default we want to be explicit).
executor.mutable_command()->set_shell(true);
if (path.isSome()) {
if (hasRootfs) {
executor.mutable_command()->set_shell(false);
executor.mutable_command()->add_arguments("mesos-executor");
executor.mutable_command()->add_arguments(
"--sandbox_directory=" + flags.sandbox_directory);
#ifndef __WINDOWS__
// NOTE: if switch_user flag is false and the slave runs under
// a non-root user, the task will be rejected by the Posix
// filesystem isolator. Linux filesystem isolator requires slave
// to have root permission.
if (flags.switch_user) {
Option<string> user;
if (task.command().has_user()) {
user = task.command().user();
} else if (frameworkInfo.has_user()) {
user = frameworkInfo.user();
}
if (user.isSome()) {
executor.mutable_command()->add_arguments(
"--user=" + user.get());
}
}
#endif // __WINDOWS__
}
executor.mutable_command()->set_value(path.get());
} else {
executor.mutable_command()->set_value(
"echo '" +
(path.isError() ? path.error() : "No such file or directory") +
"'; exit 1");
}
// Add an allowance for the command executor. This does lead to a
// small overcommit of resources.
// TODO(vinod): If a task is using revocable resources, mark the
// corresponding executor resource (e.g., cpus) to be also
// revocable. Currently, it is OK because the containerizer is
// given task + executor resources on task launch resulting in
// the container being correctly marked as revocable.
executor.mutable_resources()->MergeFrom(
Resources::parse(
"cpus:" + stringify(DEFAULT_EXECUTOR_CPUS) + ";" +
"mem:" + stringify(DEFAULT_EXECUTOR_MEM.megabytes())).get());
return executor;
}
return task.executor();
}
void Slave::executorLaunched(
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const ContainerID& containerId,
const Future<bool>& future)
{
// Set up callback for executor termination. Note that we do this
// regardless of whether or not we have successfully launched the
// executor because even if we failed to launch the executor the
// result of calling 'wait' will make sure everything gets properly
// cleaned up. Note that we do this here instead of where we do
// Containerizer::launch because we want to guarantee the contract
// with the Containerizer that we won't call 'wait' until after the
// launch has completed.
containerizer->wait(containerId)
.onAny(defer(self(),
&Self::executorTerminated,
frameworkId,
executorId,
lambda::_1));
if (!future.isReady()) {
LOG(ERROR) << "Container '" << containerId
<< "' for executor '" << executorId
<< "' of framework " << frameworkId
<< " failed to start: "
<< (future.isFailed() ? future.failure() : " future discarded");
++metrics.container_launch_errors;
containerizer->destroy(containerId);
Executor* executor = getExecutor(frameworkId, executorId);
if (executor != NULL) {
containerizer::Termination termination;
termination.set_state(TASK_FAILED);
termination.add_reasons(TaskStatus::REASON_CONTAINER_LAUNCH_FAILED);
termination.set_message(
"Failed to launch container: " +
(future.isFailed() ? future.failure() : "discarded"));
executor->pendingTermination = termination;
// TODO(jieyu): Set executor->state to be TERMINATING.
}
return;
} else if (!future.get()) {
LOG(ERROR) << "Container '" << containerId
<< "' for executor '" << executorId
<< "' of framework " << frameworkId
<< " failed to start: None of the enabled containerizers ("
<< flags.containerizers << ") could create a container for the "
<< "provided TaskInfo/ExecutorInfo message";
++metrics.container_launch_errors;
return;
}
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING) << "Framework '" << frameworkId
<< "' for executor '" << executorId
<< "' is no longer valid";
return;
}
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING)
<< framework->state;
if (framework->state == Framework::TERMINATING) {
LOG(WARNING) << "Killing executor '" << executorId
<< "' of framework " << frameworkId
<< " because the framework is terminating";
containerizer->destroy(containerId);
return;
}
Executor* executor = framework->getExecutor(executorId);
if (executor == NULL) {
LOG(WARNING) << "Killing unknown executor '" << executorId
<< "' of framework " << frameworkId;
containerizer->destroy(containerId);
return;
}
switch (executor->state) {
case Executor::TERMINATING:
LOG(WARNING) << "Killing executor " << *executor
<< " because the executor is terminating";
containerizer->destroy(containerId);
break;
case Executor::REGISTERING:
case Executor::RUNNING:
break;
case Executor::TERMINATED:
default:
LOG(FATAL) << "Executor " << *executor << " is in an unexpected state "
<< executor->state;
break;
}
}
// Called by the isolator when an executor process terminates.
void Slave::executorTerminated(
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const Future<containerizer::Termination>& termination)
{
int status;
// A termination failure indicates the containerizer could not destroy a
// container.
// TODO(idownes): This is a serious error so consider aborting the slave if
// this occurs.
if (!termination.isReady()) {
LOG(ERROR) << "Termination of executor '" << executorId
<< "' of framework " << frameworkId
<< " failed: "
<< (termination.isFailed()
? termination.failure()
: "discarded");
// Set a special status for failure.
status = -1;
} else if (!termination.get().has_status()) {
LOG(INFO) << "Executor '" << executorId
<< "' of framework " << frameworkId
<< " has terminated with unknown status";
// Set a special status for None.
status = -1;
} else {
status = termination.get().status();
LOG(INFO) << "Executor '" << executorId
<< "' of framework " << frameworkId << " "
<< WSTRINGIFY(status);
}
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING) << "Framework " << frameworkId
<< " for executor '" << executorId
<< "' does not exist";
return;
}
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING)
<< framework->state;
Executor* executor = framework->getExecutor(executorId);
if (executor == NULL) {
LOG(WARNING) << "Executor '" << executorId
<< "' of framework " << frameworkId
<< " does not exist";
return;
}
switch (executor->state) {
case Executor::REGISTERING:
case Executor::RUNNING:
case Executor::TERMINATING: {
++metrics.executors_terminated;
executor->state = Executor::TERMINATED;
// Transition all live tasks to TASK_LOST/TASK_FAILED.
// If the containerizer killed the executor (e.g., due to OOM event)
// or if this is a command executor, we send TASK_FAILED status updates
// instead of TASK_LOST.
// NOTE: We don't send updates if the framework is terminating
// because we don't want the status update manager to keep retrying
// these updates since it won't receive ACKs from the scheduler. Also,
// the status update manager should have already cleaned up all the
// status update streams for a framework that is terminating.
if (framework->state != Framework::TERMINATING) {
// Transition all live launched tasks.
// TODO(vinod): Use foreachvalue instead once LinkedHashmap
// supports it.
foreach (Task* task, executor->launchedTasks.values()) {
if (!protobuf::isTerminalState(task->state())) {
sendExecutorTerminatedStatusUpdate(
task->task_id(), termination, frameworkId, executor);
}
}
// Transition all queued tasks.
// TODO(vinod): Use foreachvalue instead once LinkedHashmap
// supports it.
foreach (const TaskInfo& task, executor->queuedTasks.values()) {
sendExecutorTerminatedStatusUpdate(
task.task_id(), termination, frameworkId, executor);
}
}
// Only send ExitedExecutorMessage if it is not a Command
// Executor because the master doesn't store them; they are
// generated by the slave.
// TODO(vinod): Reliably forward this message to the master.
if (!executor->isCommandExecutor()) {
ExitedExecutorMessage message;
message.mutable_slave_id()->MergeFrom(info.id());
message.mutable_framework_id()->MergeFrom(frameworkId);
message.mutable_executor_id()->MergeFrom(executorId);
message.set_status(status);
if (master.isSome()) { send(master.get(), message); }
}
// Remove the executor if either the slave or framework is
// terminating or there are no incomplete tasks.
if (state == TERMINATING ||
framework->state == Framework::TERMINATING ||
!executor->incompleteTasks()) {
removeExecutor(framework, executor);
}
// Remove this framework if it has no pending executors and tasks.
if (framework->executors.empty() && framework->pending.empty()) {
removeFramework(framework);
}
break;
}
default:
LOG(FATAL) << "Executor '" << executor->id
<< "' of framework " << framework->id()
<< " in unexpected state " << executor->state;
break;
}
}
void Slave::removeExecutor(Framework* framework, Executor* executor)
{
CHECK_NOTNULL(framework);
CHECK_NOTNULL(executor);
LOG(INFO) << "Cleaning up executor " << *executor;
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING)
<< framework->state;
// Check that this executor has terminated.
CHECK(executor->state == Executor::TERMINATED) << executor->state;
// Check that either 1) the executor has no tasks with pending
// updates or 2) the slave/framework is terminating, because no
// acknowledgements might be received.
CHECK(!executor->incompleteTasks() ||
state == TERMINATING ||
framework->state == Framework::TERMINATING);
// Write a sentinel file to indicate that this executor
// is completed.
if (executor->checkpoint) {
const string path = paths::getExecutorSentinelPath(
metaDir,
info.id(),
framework->id(),
executor->id,
executor->containerId);
CHECK_SOME(os::touch(path));
}
// TODO(vinod): Move the responsibility of gc'ing to the
// Executor struct.
// Schedule the executor run work directory to get garbage collected.
const string path = paths::getExecutorRunPath(
flags.work_dir,
info.id(),
framework->id(),
executor->id,
executor->containerId);
os::utime(path); // Update the modification time.
garbageCollect(path)
.then(defer(self(), &Self::detachFile, path));
// Schedule the top level executor work directory, only if the
// framework doesn't have any 'pending' tasks for this executor.
if (!framework->pending.contains(executor->id)) {
const string path = paths::getExecutorPath(
flags.work_dir, info.id(), framework->id(), executor->id);
os::utime(path); // Update the modification time.
garbageCollect(path);
}
if (executor->checkpoint) {
// Schedule the executor run meta directory to get garbage collected.
const string path = paths::getExecutorRunPath(
metaDir,
info.id(),
framework->id(),
executor->id,
executor->containerId);
os::utime(path); // Update the modification time.
garbageCollect(path);
// Schedule the top level executor meta directory, only if the
// framework doesn't have any 'pending' tasks for this executor.
if (!framework->pending.contains(executor->id)) {
const string path = paths::getExecutorPath(
metaDir, info.id(), framework->id(), executor->id);
os::utime(path); // Update the modification time.
garbageCollect(path);
}
}
if (HookManager::hooksAvailable()) {
HookManager::slaveRemoveExecutorHook(framework->info, executor->info);
}
framework->destroyExecutor(executor->id);
}
void Slave::removeFramework(Framework* framework)
{
CHECK_NOTNULL(framework);
LOG(INFO)<< "Cleaning up framework " << framework->id();
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING);
// The invariant here is that a framework should not be removed
// if it has either pending executors or pending tasks.
CHECK(framework->executors.empty());
CHECK(framework->pending.empty());
// Close all status update streams for this framework.
statusUpdateManager->cleanup(framework->id());
// Schedule the framework work and meta directories for garbage
// collection.
// TODO(vinod): Move the responsibility of gc'ing to the
// Framework struct.
const string path = paths::getFrameworkPath(
flags.work_dir, info.id(), framework->id());
os::utime(path); // Update the modification time.
garbageCollect(path);
if (framework->info.checkpoint()) {
// Schedule the framework meta directory to get garbage collected.
const string path = paths::getFrameworkPath(
metaDir, info.id(), framework->id());
os::utime(path); // Update the modification time.
garbageCollect(path);
}
frameworks.erase(framework->id());
// Pass ownership of the framework pointer.
completedFrameworks.push_back(Owned<Framework>(framework));
if (state == TERMINATING && frameworks.empty()) {
terminate(self());
}
}
void Slave::shutdownExecutor(
const UPID& from,
const FrameworkID& frameworkId,
const ExecutorID& executorId)
{
if (from && master != from) {
LOG(WARNING) << "Ignoring shutdown executor message for executor '"
<< executorId << "' of framework " << frameworkId
<< " from " << from << " because it is not from the"
<< " registered master ("
<< (master.isSome() ? stringify(master.get()) : "None") << ")";
return;
}
LOG(INFO) << "Asked to shut down executor '" << executorId
<< "' of framework " << frameworkId << " by " << from;
CHECK(state == RECOVERING || state == DISCONNECTED ||
state == RUNNING || state == TERMINATING)
<< state;
if (state == RECOVERING || state == DISCONNECTED) {
LOG(WARNING) << "Ignoring shutdown executor message for executor '"
<< executorId << "' of framework " << frameworkId
<< " because the slave has not yet registered with the master";
return;
}
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING) << "Cannot shut down executor '" << executorId
<< "' of unknown framework " << frameworkId;
return;
}
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING)
<< framework->state;
if (framework->state == Framework::TERMINATING) {
LOG(WARNING) << "Ignoring shutdown executor '" << executorId
<< "' of framework " << frameworkId
<< " because the framework is terminating";
return;
}
if (!framework->executors.contains(executorId)) {
LOG(WARNING) << "Ignoring shutdown of unknown executor '" << executorId
<< "' of framework " << frameworkId;
return;
}
Executor* executor = framework->executors[executorId];
CHECK(executor->state == Executor::REGISTERING ||
executor->state == Executor::RUNNING ||
executor->state == Executor::TERMINATING ||
executor->state == Executor::TERMINATED)
<< executor->state;
if (executor->state == Executor::TERMINATING ||
executor->state == Executor::TERMINATED) {
LOG(WARNING) << "Ignoring shutdown executor '" << executorId
<< "' of framework " << frameworkId
<< " because the executor is terminating/terminated";
return;
}
_shutdownExecutor(framework, executor);
}
void Slave::_shutdownExecutor(Framework* framework, Executor* executor)
{
CHECK_NOTNULL(framework);
CHECK_NOTNULL(executor);
LOG(INFO) << "Shutting down executor " << *executor;
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING)
<< framework->state;
CHECK(executor->state == Executor::REGISTERING ||
executor->state == Executor::RUNNING)
<< executor->state;
executor->state = Executor::TERMINATING;
// If the executor hasn't yet registered, this message
// will be dropped to the floor!
executor->send(ShutdownExecutorMessage());
// Prepare for sending a kill if the executor doesn't comply.
delay(flags.executor_shutdown_grace_period,
self(),
&Slave::shutdownExecutorTimeout,
framework->id(),
executor->id,
executor->containerId);
}
void Slave::shutdownExecutorTimeout(
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const ContainerID& containerId)
{
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(INFO) << "Framework " << frameworkId
<< " seems to have exited. Ignoring shutdown timeout"
<< " for executor '" << executorId << "'";
return;
}
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING)
<< framework->state;
Executor* executor = framework->getExecutor(executorId);
if (executor == NULL) {
VLOG(1) << "Executor '" << executorId
<< "' of framework " << frameworkId
<< " seems to have exited. Ignoring its shutdown timeout";
return;
}
// Make sure this timeout is valid.
if (executor->containerId != containerId) {
LOG(INFO) << "A new executor " << *executor
<< " with run " << executor->containerId
<< " seems to be active. Ignoring the shutdown timeout"
<< " for the old executor run " << containerId;
return;
}
switch (executor->state) {
case Executor::TERMINATED:
LOG(INFO) << "Executor " << *executor << " has already terminated";
break;
case Executor::TERMINATING:
LOG(INFO) << "Killing executor " << *executor;
containerizer->destroy(executor->containerId);
break;
default:
LOG(FATAL) << "Executor " << *executor << " is in unexpected state "
<< executor->state;
break;
}
}
void Slave::registerExecutorTimeout(
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const ContainerID& containerId)
{
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(INFO) << "Framework " << frameworkId
<< " seems to have exited. Ignoring registration timeout"
<< " for executor '" << executorId << "'";
return;
}
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING)
<< framework->state;
if (framework->state == Framework::TERMINATING) {
LOG(INFO) << "Ignoring registration timeout for executor '" << executorId
<< "' because the framework " << frameworkId
<< " is terminating";
return;
}
Executor* executor = framework->getExecutor(executorId);
if (executor == NULL) {
VLOG(1) << "Executor '" << executorId
<< "' of framework " << frameworkId
<< " seems to have exited. Ignoring its registration timeout";
return;
}
if (executor->containerId != containerId) {
LOG(INFO) << "A new executor " << *executor
<< " with run " << executor->containerId
<< " seems to be active. Ignoring the registration timeout"
<< " for the old executor run " << containerId;
return;
}
switch (executor->state) {
case Executor::RUNNING:
case Executor::TERMINATING:
case Executor::TERMINATED:
// Ignore the registration timeout.
break;
case Executor::REGISTERING: {
LOG(INFO) << "Terminating executor '" << *executor
<< "' because it did not register within "
<< flags.executor_registration_timeout;
// Immediately kill the executor.
containerizer->destroy(containerId);
executor->state = Executor::TERMINATING;
containerizer::Termination termination;
termination.set_state(TASK_FAILED);
termination.add_reasons(TaskStatus::REASON_EXECUTOR_REGISTRATION_TIMEOUT);
termination.set_message(
"Executor did not register within " +
stringify(flags.executor_registration_timeout));
executor->pendingTermination = termination;
break;
}
default:
LOG(FATAL) << "Executor " << *executor << " is in unexpected state "
<< executor->state;
break;
}
}
// TODO(vinod): Figure out a way to express this function via cmd line.
Duration Slave::age(double usage)
{
return flags.gc_delay * std::max(0.0, (1.0 - flags.gc_disk_headroom - usage));
}
void Slave::checkDiskUsage()
{
// TODO(vinod): We are making usage a Future, so that we can plug in
// fs::usage() into async.
// NOTE: We calculate disk usage of the file system on which the
// slave work directory is mounted.
Future<double>(::fs::usage(flags.work_dir))
.onAny(defer(self(), &Slave::_checkDiskUsage, lambda::_1));
}
void Slave::_checkDiskUsage(const Future<double>& usage)
{
if (!usage.isReady()) {
LOG(ERROR) << "Failed to get disk usage: "
<< (usage.isFailed() ? usage.failure() : "future discarded");
} else {
executorDirectoryMaxAllowedAge = age(usage.get());
LOG(INFO) << "Current disk usage " << std::setiosflags(std::ios::fixed)
<< std::setprecision(2) << 100 * usage.get() << "%."
<< " Max allowed age: " << executorDirectoryMaxAllowedAge;
// We prune all directories whose deletion time is within
// the next 'gc_delay - age'. Since a directory is always
// scheduled for deletion 'gc_delay' into the future, only directories
// that are at least 'age' old are deleted.
gc->prune(flags.gc_delay - executorDirectoryMaxAllowedAge);
}
delay(flags.disk_watch_interval, self(), &Slave::checkDiskUsage);
}
Future<Nothing> Slave::recover(const Result<state::State>& state)
{
if (state.isError()) {
return Failure(state.error());
}
Option<ResourcesState> resourcesState;
Option<SlaveState> slaveState;
if (state.isSome()) {
resourcesState = state.get().resources;
slaveState = state.get().slave;
}
// Recover checkpointed resources.
// NOTE: 'resourcesState' is None if the slave rootDir does not
// exist or the resources checkpoint file cannot be found.
if (resourcesState.isSome()) {
if (resourcesState.get().errors > 0) {
LOG(WARNING) << "Errors encountered during resources recovery: "
<< resourcesState.get().errors;
metrics.recovery_errors += resourcesState.get().errors;
}
// This is to verify that the checkpointed resources are
// compatible with the slave resources specified through the
// '--resources' command line flag.
Try<Resources> totalResources = applyCheckpointedResources(
info.resources(),
resourcesState.get().resources);
if (totalResources.isError()) {
return Failure(
"Checkpointed resources " +
stringify(resourcesState.get().resources) +
" are incompatible with slave resources " +
stringify(info.resources()) + ": " +
totalResources.error());
}
checkpointedResources = resourcesState.get().resources;
}
if (slaveState.isSome() && slaveState.get().info.isSome()) {
// Check for SlaveInfo compatibility.
// TODO(vinod): Also check for version compatibility.
// NOTE: We set the 'id' field in 'info' from the recovered slave,
// as a hack to compare the info created from options/flags with
// the recovered info.
info.mutable_id()->CopyFrom(slaveState.get().id);
if (flags.recover == "reconnect" &&
!(info == slaveState.get().info.get())) {
return Failure(strings::join(
"\n",
"Incompatible slave info detected.",
"------------------------------------------------------------",
"Old slave info:\n" + stringify(slaveState.get().info.get()),
"------------------------------------------------------------",
"New slave info:\n" + stringify(info),
"------------------------------------------------------------"));
}
info = slaveState.get().info.get(); // Recover the slave info.
if (slaveState.get().errors > 0) {
LOG(WARNING) << "Errors encountered during slave recovery: "
<< slaveState.get().errors;
metrics.recovery_errors += slaveState.get().errors;
}
// TODO(bernd-mesos): Make this an instance method call, see comment
// in "fetcher.hpp"".
Try<Nothing> recovered = Fetcher::recover(slaveState.get().id, flags);
if (recovered.isError()) {
return Failure(recovered.error());
}
// Recover the frameworks.
foreachvalue (const FrameworkState& frameworkState,
slaveState.get().frameworks) {
recoverFramework(frameworkState);
}
}
return statusUpdateManager->recover(metaDir, slaveState)
.then(defer(self(), &Slave::_recoverContainerizer, slaveState));
}
Future<Nothing> Slave::_recoverContainerizer(
const Option<state::SlaveState>& state)
{
return containerizer->recover(state);
}
Future<Nothing> Slave::_recover()
{
foreachvalue (Framework* framework, frameworks) {
foreachvalue (Executor* executor, framework->executors) {
// Set up callback for executor termination.
containerizer->wait(executor->containerId)
.onAny(defer(self(),
&Self::executorTerminated,
framework->id(),
executor->id,
lambda::_1));
if (flags.recover == "reconnect") {
// We send a reconnect message for PID based executors
// as we can initiate communication with them. Recovered
// HTTP executors, on the other hand, are responsible for
// subscribing back with the agent using a retry interval.
// Note that recovered http executors are marked with
// http.isNone and pid.isNone (see comments in the header).
if (executor->pid.isSome() && executor->pid.get()) {
LOG(INFO)
<< "Sending reconnect request to executor " << *executor;
ReconnectExecutorMessage message;
message.mutable_slave_id()->MergeFrom(info.id());
send(executor->pid.get(), message);
} else if (executor->pid.isNone()) {
LOG(INFO) << "Waiting for executor " << *executor
<< " to subscribe";
} else {
LOG(INFO) << "Unable to reconnect to executor " << *executor
<< " because no pid or http checkpoint file was found";
}
} else {
// For PID-based executors, we ask the executor to shut
// down and give it time to terminate. For HTTP executors,
// we do the same, however, the shutdown will only be sent
// when the executor subscribes.
if ((executor->pid.isSome() && executor->pid.get()) ||
executor->pid.isNone()) {
LOG(INFO) << "Sending shutdown to executor " << *executor;
_shutdownExecutor(framework, executor);
} else {
LOG(INFO) << "Killing executor " << *executor
<< " because no pid or http checkpoint file was found";
containerizer->destroy(executor->containerId);
}
}
}
}
if (!frameworks.empty() && flags.recover == "reconnect") {
// Cleanup unregistered executors after a delay.
delay(EXECUTOR_REREGISTER_TIMEOUT,
self(),
&Slave::reregisterExecutorTimeout);
// We set 'recovered' flag inside reregisterExecutorTimeout(),
// so that when the slave re-registers with master it can
// correctly inform the master about the launched tasks.
return recovered.future();
}
return Nothing();
}
void Slave::__recover(const Future<Nothing>& future)
{
if (!future.isReady()) {
EXIT(1)
<< "Failed to perform recovery: "
<< (future.isFailed() ? future.failure() : "future discarded") << "\n"
<< "To remedy this do as follows:\n"
<< "Step 1: rm -f " << paths::getLatestSlavePath(metaDir) << "\n"
<< " This ensures slave doesn't recover old live executors.\n"
<< "Step 2: Restart the slave.";
}
LOG(INFO) << "Finished recovery";
CHECK_EQ(RECOVERING, state);
// Checkpoint boot ID.
Try<string> bootId = os::bootId();
if (bootId.isError()) {
LOG(ERROR) << "Could not retrieve boot id: " << bootId.error();
} else {
const string path = paths::getBootIdPath(metaDir);
CHECK_SOME(state::checkpoint(path, bootId.get()));
}
// Schedule all old slave directories for garbage collection.
// TODO(vinod): Do this as part of recovery. This needs a fix
// in the recovery code, to recover all slaves instead of only
// the latest slave.
const string directory = path::join(flags.work_dir, "slaves");
Try<list<string>> entries = os::ls(directory);
if (entries.isSome()) {
foreach (const string& entry, entries.get()) {
string path = path::join(directory, entry);
// Ignore non-directory entries.
if (!os::stat::isdir(path)) {
continue;
}
// We garbage collect a directory if either the slave has not
// recovered its id (hence going to get a new id when it
// registers with the master) or if it is an old work directory.
SlaveID slaveId;
slaveId.set_value(entry);
if (!info.has_id() || !(slaveId == info.id())) {
LOG(INFO) << "Garbage collecting old slave " << slaveId;
// NOTE: We update the modification time of the slave work/meta
// directories even though these are old because these
// directories might not have been scheduled for gc before.
// GC the slave work directory.
os::utime(path); // Update the modification time.
garbageCollect(path);
// GC the slave meta directory.
path = paths::getSlavePath(metaDir, slaveId);
if (os::exists(path)) {
os::utime(path); // Update the modification time.
garbageCollect(path);
}
}
}
}
if (flags.recover == "reconnect") {
state = DISCONNECTED;
// Start detecting masters.
detection = detector->detect()
.onAny(defer(self(), &Slave::detected, lambda::_1));
// Forward oversubscribed resources.
forwardOversubscribed();
// Start acting on correction from QoS Controller.
qosCorrections();
} else {
// Slave started in cleanup mode.
CHECK_EQ("cleanup", flags.recover);
state = TERMINATING;
if (frameworks.empty()) {
terminate(self());
}
// If there are active executors/frameworks, the slave will
// shutdown when all the executors are terminated. Note that
// the executors are guaranteed to terminate because they
// are sent shutdown signal in '_recover()' which results in
// 'Containerizer::destroy()' being called if the termination
// doesn't happen within a timeout.
}
recovered.set(Nothing()); // Signal recovery.
}
void Slave::recoverFramework(const FrameworkState& state)
{
LOG(INFO) << "Recovering framework " << state.id;
if (state.executors.empty()) {
// GC the framework work directory.
garbageCollect(
paths::getFrameworkPath(flags.work_dir, info.id(), state.id));
// GC the framework meta directory.
garbageCollect(
paths::getFrameworkPath(metaDir, info.id(), state.id));
return;
}
CHECK(!frameworks.contains(state.id));
CHECK_SOME(state.info);
FrameworkInfo frameworkInfo = state.info.get();
// Mesos 0.22 and earlier didn't write the FrameworkID into the FrameworkInfo.
// In this case, we we update FrameworkInfo.framework_id from directory name,
// and rewrite the new format when we are done.
bool recheckpoint = false;
if (!frameworkInfo.has_id()) {
frameworkInfo.mutable_id()->CopyFrom(state.id);
recheckpoint = true;
}
CHECK(frameworkInfo.has_id());
CHECK(frameworkInfo.checkpoint());
// In 0.24.0, HTTP schedulers are supported and these do not
// have a 'pid'. In this case, the slave will checkpoint UPID().
CHECK_SOME(state.pid);
Option<UPID> pid = state.pid.get();
if (pid.get() == UPID()) {
pid = None();
}
Framework* framework = new Framework(this, frameworkInfo, pid);
frameworks[framework->id()] = framework;
if (recheckpoint) {
framework->checkpointFramework();
}
// Now recover the executors for this framework.
foreachvalue (const ExecutorState& executorState, state.executors) {
framework->recoverExecutor(executorState);
}
// Remove the framework in case we didn't recover any executors.
if (framework->executors.empty()) {
removeFramework(framework);
}
}
Future<Nothing> Slave::garbageCollect(const string& path)
{
Try<long> mtime = os::stat::mtime(path);
if (mtime.isError()) {
LOG(ERROR) << "Failed to find the mtime of '" << path
<< "': " << mtime.error();
return Failure(mtime.error());
}
// It is unsafe for testing to use unix time directly, we must use
// Time::create to convert into a Time object that reflects the
// possibly advanced state of the libprocess Clock.
Try<Time> time = Time::create(mtime.get());
CHECK_SOME(time);
// GC based on the modification time.
Duration delay = flags.gc_delay - (Clock::now() - time.get());
return gc->schedule(delay, path);
}
void Slave::forwardOversubscribed()
{
VLOG(1) << "Querying resource estimator for oversubscribable resources";
resourceEstimator->oversubscribable()
.onAny(defer(self(), &Self::_forwardOversubscribed, lambda::_1));
}
void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
{
if (!oversubscribable.isReady()) {
LOG(ERROR) << "Failed to get oversubscribable resources: "
<< (oversubscribable.isFailed()
? oversubscribable.failure() : "future discarded");
} else {
VLOG(1) << "Received oversubscribable resources "
<< oversubscribable.get() << " from the resource estimator";
// Calculate the latest allocation of oversubscribed resources.
// Note that this allocation value might be different from the
// master's view because new task/executor might be in flight from
// the master or pending on the slave etc. This is ok because the
// allocator only considers the slave's view of allocation when
// calculating the available oversubscribed resources to offer.
Resources oversubscribed;
foreachvalue (Framework* framework, frameworks) {
foreachvalue (Executor* executor, framework->executors) {
oversubscribed += executor->resources.revocable();
}
}
// Add oversubscribable resources to the total.
oversubscribed += oversubscribable.get();
// Only forward the estimate if it's different from the previous
// estimate. We also send this whenever we get (re-)registered
// (i.e. whenever we transition into the RUNNING state).
if (state == RUNNING && oversubscribedResources != oversubscribed) {
LOG(INFO) << "Forwarding total oversubscribed resources "
<< oversubscribed;
UpdateSlaveMessage message;
message.mutable_slave_id()->CopyFrom(info.id());
message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed);
CHECK_SOME(master);
send(master.get(), message);
}
// Update the estimate.
oversubscribedResources = oversubscribed;
}
delay(flags.oversubscribed_resources_interval,
self(),
&Self::forwardOversubscribed);
}
void Slave::qosCorrections()
{
qosController->corrections()
.onAny(defer(self(), &Self::_qosCorrections, lambda::_1));
}
void Slave::_qosCorrections(const Future<list<QoSCorrection>>& future)
{
// Make sure correction handler is scheduled again.
delay(flags.qos_correction_interval_min, self(), &Self::qosCorrections);
// Verify slave state.
CHECK(state == RECOVERING || state == DISCONNECTED ||
state == RUNNING || state == TERMINATING)
<< state;
if (state == RECOVERING || state == TERMINATING) {
LOG(WARNING) << "Cannot perform QoS corrections because the slave is "
<< state;
return;
}
if (!future.isReady()) {
LOG(WARNING) << "Failed to get corrections from QoS Controller: "
<< (future.isFailed() ? future.failure() : "discarded");
return;
}
const list<QoSCorrection>& corrections = future.get();
LOG(INFO) << "Received " << corrections.size() << " QoS corrections";
foreach (const QoSCorrection& correction, corrections) {
// TODO(nnielsen): Print correction, once the operator overload
// for QoSCorrection has been implemented.
if (correction.type() == QoSCorrection::KILL) {
const QoSCorrection::Kill& kill = correction.kill();
if (!kill.has_framework_id()) {
LOG(WARNING) << "Ignoring QoS correction KILL: "
<< "framework id not specified.";
continue;
}
const FrameworkID& frameworkId = kill.framework_id();
if (!kill.has_executor_id()) {
// TODO(nnielsen): For now, only executor killing is supported. Check
// can be removed when task killing is supported as well.
LOG(WARNING) << "Ignoring QoS correction KILL on framework "
<< frameworkId << ": executor id not specified";
continue;
}
const ExecutorID& executorId = kill.executor_id();
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
LOG(WARNING) << "Ignoring QoS correction KILL on framework "
<< frameworkId << ": framework cannot be found";
continue;
}
// Verify framework state.
CHECK(framework->state == Framework::RUNNING ||
framework->state == Framework::TERMINATING)
<< framework->state;
if (framework->state == Framework::TERMINATING) {
LOG(WARNING) << "Ignoring QoS correction KILL on framework "
<< frameworkId << ": framework is terminating.";
continue;
}
Executor* executor = framework->getExecutor(executorId);
if (executor == NULL) {
LOG(WARNING) << "Ignoring QoS correction KILL on executor '"
<< executorId << "' of framework " << frameworkId
<< ": executor cannot be found";
continue;
}
const ContainerID containerId =
kill.has_container_id() ? kill.container_id() : executor->containerId;
if (containerId != executor->containerId) {
LOG(WARNING) << "Ignoring QoS correction KILL on container '"
<< containerId << "' for executor " << *executor
<< ": container cannot be found";
continue;
}
switch (executor->state) {
case Executor::REGISTERING:
case Executor::RUNNING: {
LOG(INFO) << "Killing container '" << containerId
<< "' for executor " << *executor
<< " as QoS correction";
containerizer->destroy(containerId);
// TODO(nnielsen): We should ensure that we are addressing
// the _container_ which the QoS controller intended to
// kill. Without this check, we may run into a scenario
// where the executor has terminated and one with the same
// id has started in the interim i.e. running in a different
// container than the one the QoS controller targeted
// (MESOS-2875).
executor->state = Executor::TERMINATING;
containerizer::Termination termination;
termination.set_state(TASK_LOST);
termination.add_reasons(TaskStatus::REASON_CONTAINER_PREEMPTED);
termination.set_message("Container preempted by QoS correction");
executor->pendingTermination = termination;
++metrics.executors_preempted;
break;
}
case Executor::TERMINATING:
case Executor::TERMINATED:
LOG(WARNING) << "Ignoring QoS correction KILL on executor "
<< *executor << " because the executor is in "
<< executor->state << " state";
break;
default:
LOG(FATAL) << "Executor " << *executor << " is in unexpected state "
<< executor->state;
break;
}
} else {
LOG(WARNING) << "QoS correction type " << correction.type()
<< " is not supported";
}
}
}
Future<ResourceUsage> Slave::usage()
{
// NOTE: We use 'Owned' here trying to avoid the expensive copy.
// C++11 lambda only supports capturing variables that have copy
// constructors. Revisit once we remove the copy constructor for
// Owned (or C++14 lambda generalized capture is supported).
Owned<ResourceUsage> usage(new ResourceUsage());
list<Future<ResourceStatistics>> futures;
foreachvalue (const Framework* framework, frameworks) {
foreachvalue (const Executor* executor, framework->executors) {
ResourceUsage::Executor* entry = usage->add_executors();
entry->mutable_executor_info()->CopyFrom(executor->info);
entry->mutable_allocated()->CopyFrom(executor->resources);
entry->mutable_container_id()->CopyFrom(executor->containerId);
futures.push_back(containerizer->usage(executor->containerId));
}
}
Try<Resources> totalResources = applyCheckpointedResources(
info.resources(),
checkpointedResources);
CHECK_SOME(totalResources)
<< "Failed to apply checkpointed resources "
<< checkpointedResources << " to slave's resources "
<< info.resources();
usage->mutable_total()->CopyFrom(totalResources.get());
return await(futures).then(
[usage](const list<Future<ResourceStatistics>>& futures) {
// NOTE: We add ResourceUsage::Executor to 'usage' the same
// order as we push future to 'futures'. So the variables
// 'future' and 'executor' below should be in sync.
CHECK_EQ(futures.size(), (size_t) usage->executors_size());
size_t i = 0;
foreach (const Future<ResourceStatistics>& future, futures) {
ResourceUsage::Executor* executor = usage->mutable_executors(i++);
if (future.isReady()) {
executor->mutable_statistics()->CopyFrom(future.get());
} else {
LOG(WARNING) << "Failed to get resource statistics for executor '"
<< executor->executor_info().executor_id() << "'"
<< " of framework "
<< executor->executor_info().framework_id() << ": "
<< (future.isFailed() ? future.failure()
: "discarded");
}
}
return Future<ResourceUsage>(*usage);
});
}
// TODO(dhamon): Move these to their own metrics.hpp|cpp.
double Slave::_tasks_staging()
{
double count = 0.0;
foreachvalue (Framework* framework, frameworks) {
typedef hashmap<TaskID, TaskInfo> TaskMap;
foreachvalue (const TaskMap& tasks, framework->pending) {
count += tasks.size();
}
foreachvalue (Executor* executor, framework->executors) {
count += executor->queuedTasks.size();
foreach (Task* task, executor->launchedTasks.values()) {
if (task->state() == TASK_STAGING) {
count++;
}
}
}
}
return count;
}
double Slave::_tasks_starting()
{
double count = 0.0;
foreachvalue (Framework* framework, frameworks) {
foreachvalue (Executor* executor, framework->executors) {
foreach (Task* task, executor->launchedTasks.values()) {
if (task->state() == TASK_STARTING) {
count++;
}
}
}
}
return count;
}
double Slave::_tasks_running()
{
double count = 0.0;
foreachvalue (Framework* framework, frameworks) {
foreachvalue (Executor* executor, framework->executors) {
foreach (Task* task, executor->launchedTasks.values()) {
if (task->state() == TASK_RUNNING) {
count++;
}
}
}
}
return count;
}
double Slave::_executors_registering()
{
double count = 0.0;
foreachvalue (Framework* framework, frameworks) {
foreachvalue (Executor* executor, framework->executors) {
if (executor->state == Executor::REGISTERING) {
count++;
}
}
}
return count;
}
double Slave::_executors_running()
{
double count = 0.0;
foreachvalue (Framework* framework, frameworks) {
foreachvalue (Executor* executor, framework->executors) {
if (executor->state == Executor::RUNNING) {
count++;
}
}
}
return count;
}
double Slave::_executors_terminating()
{
double count = 0.0;
foreachvalue (Framework* framework, frameworks) {
foreachvalue (Executor* executor, framework->executors) {
if (executor->state == Executor::TERMINATING) {
count++;
}
}
}
return count;
}
double Slave::_executor_directory_max_allowed_age_secs()
{
return executorDirectoryMaxAllowedAge.secs();
}
void Slave::sendExecutorTerminatedStatusUpdate(
const TaskID& taskId,
const Future<containerizer::Termination>& termination,
const FrameworkID& frameworkId,
const Executor* executor)
{
CHECK_NOTNULL(executor);
mesos::TaskState state;
TaskStatus::Reason reason;
string message;
// Determine the task state for the status update.
if (termination.isReady() && termination->has_state()) {
state = termination->state();
} else if (executor->pendingTermination.isSome() &&
executor->pendingTermination->has_state()) {
state = executor->pendingTermination->state();
} else {
state = TASK_FAILED;
}
// Determine the task reason for the status update.
// TODO(jieyu): Handle multiple reasons (MESOS-2657).
if (termination.isReady() && termination->reasons().size() > 0) {
reason = termination->reasons(0);
} else if (executor->pendingTermination.isSome() &&
executor->pendingTermination->reasons().size() > 0) {
reason = executor->pendingTermination->reasons(0);
} else {
reason = TaskStatus::REASON_EXECUTOR_TERMINATED;
}
// Determine the message for the status update.
vector<string> messages;
if (executor->pendingTermination.isSome() &&
executor->pendingTermination->has_message()) {
messages.push_back(executor->pendingTermination->message());
}
if (!termination.isReady()) {
messages.push_back("Abnormal executor termination");
} else if (termination->has_message()) {
messages.push_back(termination->message());
}
if (messages.empty()) {
message = "Executor terminated";
} else {
message = strings::join("; ", messages);
}
statusUpdate(protobuf::createStatusUpdate(
frameworkId,
info.id(),
taskId,
state,
TaskStatus::SOURCE_SLAVE,
UUID::random(),
message,
reason,
executor->id),
UPID());
}
double Slave::_resources_total(const string& name)
{
double total = 0.0;
foreach (const Resource& resource, info.resources()) {
if (resource.name() == name && resource.type() == Value::SCALAR) {
total += resource.scalar().value();
}
}
return total;
}
double Slave::_resources_used(const string& name)
{
double used = 0.0;
foreachvalue (Framework* framework, frameworks) {
foreachvalue (Executor* executor, framework->executors) {
foreach (const Resource& resource, executor->resources.nonRevocable()) {
if (resource.name() == name && resource.type() == Value::SCALAR) {
used += resource.scalar().value();
}
}
}
}
return used;
}
double Slave::_resources_percent(const string& name)
{
double total = _resources_total(name);
if (total == 0.0) {
return 0.0;
}
return _resources_used(name) / total;
}
double Slave::_resources_revocable_total(const string& name)
{
double total = 0.0;
if (oversubscribedResources.isSome()) {
foreach (const Resource& resource, oversubscribedResources.get()) {
if (resource.name() == name && resource.type() == Value::SCALAR) {
total += resource.scalar().value();
}
}
}
return total;
}
double Slave::_resources_revocable_used(const string& name)
{
double used = 0.0;
foreachvalue (Framework* framework, frameworks) {
foreachvalue (Executor* executor, framework->executors) {
foreach (const Resource& resource, executor->resources.revocable()) {
if (resource.name() == name && resource.type() == Value::SCALAR) {
used += resource.scalar().value();
}
}
}
}
return used;
}
double Slave::_resources_revocable_percent(const string& name)
{
double total = _resources_revocable_total(name);
if (total == 0.0) {
return 0.0;
}
return _resources_revocable_used(name) / total;
}
Framework::Framework(
Slave* _slave,
const FrameworkInfo& _info,
const Option<UPID>& _pid)
: state(RUNNING),
slave(_slave),
info(_info),
pid(_pid),
completedExecutors(MAX_COMPLETED_EXECUTORS_PER_FRAMEWORK) {}
void Framework::checkpointFramework() const
{
// Checkpoint the framework info.
string path = paths::getFrameworkInfoPath(
slave->metaDir, slave->info.id(), id());
VLOG(1) << "Checkpointing FrameworkInfo to '" << path << "'";
CHECK_SOME(state::checkpoint(path, info));
// Checkpoint the framework pid, note that we checkpoint a
// UPID() when it is None (for HTTP schedulers) because
// 0.23.x slaves consider a missing pid file to be an
// error.
path = paths::getFrameworkPidPath(
slave->metaDir, slave->info.id(), id());
VLOG(1) << "Checkpointing framework pid"
<< " '" << pid.getOrElse(UPID()) << "'"
<< " to '" << path << "'";
CHECK_SOME(state::checkpoint(path, pid.getOrElse(UPID())));
}
Framework::~Framework()
{
// We own the non-completed executor pointers, so they need to be deleted.
foreachvalue (Executor* executor, executors) {
delete executor;
}
}
// Create and launch an executor.
Executor* Framework::launchExecutor(
const ExecutorInfo& executorInfo,
const TaskInfo& taskInfo)
{
// Generate an ID for the executor's container.
// TODO(idownes) This should be done by the containerizer but we
// need the ContainerID to create the executor's directory. Fix
// this when 'launchExecutor()' is handled asynchronously.
ContainerID containerId;
containerId.set_value(UUID::random().toString());
Option<string> user = None();
#ifndef __WINDOWS__
if (slave->flags.switch_user) {
// The command (either in form of task or executor command) can
// define a specific user to run as. If present, this precedes the
// framework user value. The selected user will have been verified by
// the master at this point through the active ACLs.
// NOTE: The global invariant is that the executor info at this
// point is (1) the user provided task.executor() or (2) a command
// executor constructed by the slave from the task.command().
// If this changes, we need to check the user in both
// task.command() and task.executor().command() below.
user = info.user();
if (executorInfo.command().has_user()) {
user = executorInfo.command().user();
}
}
#endif // __WINDOWS__
// Create a directory for the executor.
const string directory = paths::createExecutorDirectory(
slave->flags.work_dir,
slave->info.id(),
id(),
executorInfo.executor_id(),
containerId,
user);
Executor* executor = new Executor(
slave, id(), executorInfo, containerId, directory, info.checkpoint());
if (executor->checkpoint) {
executor->checkpointExecutor();
}
CHECK(!executors.contains(executorInfo.executor_id()))
<< "Unknown executor " << executorInfo.executor_id();
executors[executorInfo.executor_id()] = executor;
LOG(INFO) << "Launching executor " << executorInfo.executor_id()
<< " of framework " << id()
<< " with resources " << executorInfo.resources()
<< " in work directory '" << directory << "'";
slave->files->attach(executor->directory, executor->directory)
.onAny(defer(slave, &Slave::fileAttached, lambda::_1, executor->directory));
// Tell the containerizer to launch the executor.
// NOTE: We modify the ExecutorInfo to include the task's
// resources when launching the executor so that the containerizer
// has non-zero resources to work with when the executor has
// no resources. This should be revisited after MESOS-600.
ExecutorInfo executorInfo_ = executor->info;
Resources resources = executorInfo_.resources();
resources += taskInfo.resources();
executorInfo_.mutable_resources()->CopyFrom(resources);
// Launch the container.
Future<bool> launch;
if (!executor->isCommandExecutor()) {
// If the executor is _not_ a command executor, this means that
// the task will include the executor to run. The actual task to
// run will be enqueued and subsequently handled by the executor
// when it has registered to the slave.
launch = slave->containerizer->launch(
containerId,
executorInfo_, // Modified to include the task's resources, see above.
executor->directory,
user,
slave->info.id(),
slave->self(),
info.checkpoint());
} else {
// An executor has _not_ been provided by the task and will
// instead define a command and/or container to run. Right now,
// these tasks will require an executor anyway and the slave
// creates a command executor. However, it is up to the
// containerizer how to execute those tasks and the generated
// executor info works as a placeholder.
// TODO(nnielsen): Obsolete the requirement for executors to run
// one-off tasks.
launch = slave->containerizer->launch(
containerId,
taskInfo,
executorInfo_, // Modified to include the task's resources, see above.
executor->directory,
user,
slave->info.id(),
slave->self(),
info.checkpoint());
}
launch.onAny(defer(slave,
&Slave::executorLaunched,
id(),
executor->id,
containerId,
lambda::_1));
// Make sure the executor registers within the given timeout.
delay(slave->flags.executor_registration_timeout,
slave,
&Slave::registerExecutorTimeout,
id(),
executor->id,
containerId);
return executor;
}
void Framework::destroyExecutor(const ExecutorID& executorId)
{
if (executors.contains(executorId)) {
Executor* executor = executors[executorId];
executors.erase(executorId);
// Pass ownership of the executor pointer.
completedExecutors.push_back(Owned<Executor>(executor));
}
}
Executor* Framework::getExecutor(const ExecutorID& executorId)
{
if (executors.contains(executorId)) {
return executors[executorId];
}
return NULL;
}
Executor* Framework::getExecutor(const TaskID& taskId)
{
foreachvalue (Executor* executor, executors) {
if (executor->queuedTasks.contains(taskId) ||
executor->launchedTasks.contains(taskId) ||
executor->terminatedTasks.contains(taskId)) {
return executor;
}
}
return NULL;
}
void Framework::recoverExecutor(const ExecutorState& state)
{
LOG(INFO) << "Recovering executor '" << state.id
<< "' of framework " << id();
CHECK_NOTNULL(slave);
if (state.runs.empty() || state.latest.isNone() || state.info.isNone()) {
LOG(WARNING) << "Skipping recovery of executor '" << state.id
<< "' of framework " << id()
<< " because its latest run or executor info"
<< " cannot be recovered";
// GC the top level executor work directory.
slave->garbageCollect(paths::getExecutorPath(
slave->flags.work_dir, slave->info.id(), id(), state.id));
// GC the top level executor meta directory.
slave->garbageCollect(paths::getExecutorPath(
slave->metaDir, slave->info.id(), id(), state.id));
return;
}
// We are only interested in the latest run of the executor!
// So, we GC all the old runs.
// NOTE: We don't schedule the top level executor work and meta
// directories for GC here, because they will be scheduled when
// the latest executor run terminates.
const ContainerID& latest = state.latest.get();
foreachvalue (const RunState& run, state.runs) {
CHECK_SOME(run.id);
const ContainerID& runId = run.id.get();
if (latest != runId) {
// GC the executor run's work directory.
// TODO(vinod): Expose this directory to webui by recovering the
// tasks and doing a 'files->attach()'.
slave->garbageCollect(paths::getExecutorRunPath(
slave->flags.work_dir, slave->info.id(), id(), state.id, runId));
// GC the executor run's meta directory.
slave->garbageCollect(paths::getExecutorRunPath(
slave->metaDir, slave->info.id(), id(), state.id, runId));
}
}
Option<RunState> run = state.runs.get(latest);
CHECK_SOME(run)
<< "Cannot find latest run " << latest << " for executor " << state.id
<< " of framework " << id();
// Create executor.
const string directory = paths::getExecutorRunPath(
slave->flags.work_dir, slave->info.id(), id(), state.id, latest);
Executor* executor = new Executor(
slave, id(), state.info.get(), latest, directory, info.checkpoint());
// Recover the libprocess PID if possible for PID based executors.
if (run.get().http.isSome()) {
if (!run.get().http.get()) {
// When recovering in non-strict mode, the assumption is that the
// slave can die after checkpointing the forked pid but before the
// libprocess pid. So, it is not possible for the libprocess pid
// to exist but not the forked pid. If so, it is a really bad
// situation (e.g., disk corruption).
CHECK_SOME(run.get().forkedPid)
<< "Failed to get forked pid for executor " << state.id
<< " of framework " << id();
executor->pid = run.get().libprocessPid.get();
} else {
// We set the PID to None() to signify that this is a HTTP based
// executor.
executor->pid = None();
}
} else {
// We set the PID to UPID() to signify that the connection type for this
// executor is unknown.
executor->pid = UPID();
}
// And finally recover all the executor's tasks.
foreachvalue (const TaskState& taskState, run.get().tasks) {
executor->recoverTask(taskState);
}
// Expose the executor's files.
slave->files->attach(executor->directory, executor->directory)
.onAny(defer(slave, &Slave::fileAttached, lambda::_1, executor->directory));
// Add the executor to the framework.
executors[executor->id] = executor;
// If the latest run of the executor was completed (i.e., terminated
// and all updates are acknowledged) in the previous run, we
// transition its state to 'TERMINATED' and gc the directories.
if (run.get().completed) {
++slave->metrics.executors_terminated;
executor->state = Executor::TERMINATED;
CHECK_SOME(run.get().id);
const ContainerID& runId = run.get().id.get();
// GC the executor run's work directory.
const string path = paths::getExecutorRunPath(
slave->flags.work_dir, slave->info.id(), id(), state.id, runId);
slave->garbageCollect(path)
.then(defer(slave, &Slave::detachFile, path));
// GC the executor run's meta directory.
slave->garbageCollect(paths::getExecutorRunPath(
slave->metaDir, slave->info.id(), id(), state.id, runId));
// GC the top level executor work directory.
slave->garbageCollect(paths::getExecutorPath(
slave->flags.work_dir, slave->info.id(), id(), state.id));
// GC the top level executor meta directory.
slave->garbageCollect(paths::getExecutorPath(
slave->metaDir, slave->info.id(), id(), state.id));
// Move the executor to 'completedExecutors'.
destroyExecutor(executor->id);
}
return;
}
Executor::Executor(
Slave* _slave,
const FrameworkID& _frameworkId,
const ExecutorInfo& _info,
const ContainerID& _containerId,
const string& _directory,
bool _checkpoint)
: state(REGISTERING),
slave(_slave),
id(_info.executor_id()),
info(_info),
frameworkId(_frameworkId),
containerId(_containerId),
directory(_directory),
checkpoint(_checkpoint),
http(None()),
pid(None()),
resources(_info.resources()),
completedTasks(MAX_COMPLETED_TASKS_PER_EXECUTOR)
{
CHECK_NOTNULL(slave);
Result<string> executorPath =
os::realpath(path::join(slave->flags.launcher_dir, "mesos-executor"));
if (executorPath.isSome()) {
commandExecutor =
strings::contains(info.command().value(), executorPath.get());
}
}
Executor::~Executor()
{
if (http.isSome()) {
closeHttpConnection();
}
// Delete the tasks.
// TODO(vinod): Use foreachvalue instead once LinkedHashmap
// supports it.
foreach (Task* task, launchedTasks.values()) {
delete task;
}
foreach (Task* task, terminatedTasks.values()) {
delete task;
}
}
Task* Executor::addTask(const TaskInfo& task)
{
// The master should enforce unique task IDs, but just in case
// maybe we shouldn't make this a fatal error.
CHECK(!launchedTasks.contains(task.task_id()))
<< "Duplicate task " << task.task_id();
Task* t = new Task(protobuf::createTask(task, TASK_STAGING, frameworkId));
launchedTasks[task.task_id()] = t;
resources += task.resources();
return t;
}
void Executor::terminateTask(
const TaskID& taskId,
const mesos::TaskStatus& status)
{
VLOG(1) << "Terminating task " << taskId;
Task* task = NULL;
// Remove the task if it's queued.
if (queuedTasks.contains(taskId)) {
task = new Task(
protobuf::createTask(queuedTasks[taskId], status.state(), frameworkId));
queuedTasks.erase(taskId);
} else if (launchedTasks.contains(taskId)) {
// Update the resources if it's been launched.
task = launchedTasks[taskId];
resources -= task->resources();
launchedTasks.erase(taskId);
}
switch (status.state()) {
case TASK_FINISHED:
++slave->metrics.tasks_finished;
break;
case TASK_FAILED:
++slave->metrics.tasks_failed;
break;
case TASK_KILLED:
++slave->metrics.tasks_killed;
break;
case TASK_LOST:
++slave->metrics.tasks_lost;
break;
default:
LOG(WARNING) << "Unhandled task state " << status.state()
<< " on completion.";
break;
}
// TODO(dhamon): Update source/reason metrics.
terminatedTasks[taskId] = CHECK_NOTNULL(task);
}
void Executor::completeTask(const TaskID& taskId)
{
VLOG(1) << "Completing task " << taskId;
CHECK(terminatedTasks.contains(taskId))
<< "Failed to find terminated task " << taskId;
Task* task = terminatedTasks[taskId];
completedTasks.push_back(std::shared_ptr<Task>(task));
terminatedTasks.erase(taskId);
}
void Executor::checkpointExecutor()
{
CHECK(checkpoint);
CHECK_NE(slave->state, slave->RECOVERING);
// Checkpoint the executor info.
const string path = paths::getExecutorInfoPath(
slave->metaDir, slave->info.id(), frameworkId, id);
VLOG(1) << "Checkpointing ExecutorInfo to '" << path << "'";
CHECK_SOME(state::checkpoint(path, info));
// Create the meta executor directory.
// NOTE: This creates the 'latest' symlink in the meta directory.
paths::createExecutorDirectory(
slave->metaDir, slave->info.id(), frameworkId, id, containerId);
}
void Executor::checkpointTask(const TaskInfo& task)
{
CHECK(checkpoint);
const Task t = protobuf::createTask(task, TASK_STAGING, frameworkId);
const string path = paths::getTaskInfoPath(
slave->metaDir,
slave->info.id(),
frameworkId,
id,
containerId,
t.task_id());
VLOG(1) << "Checkpointing TaskInfo to '" << path << "'";
CHECK_SOME(state::checkpoint(path, t));
}
void Executor::recoverTask(const TaskState& state)
{
if (state.info.isNone()) {
LOG(WARNING) << "Skipping recovery of task " << state.id
<< " because its info cannot be recovered";
return;
}
launchedTasks[state.id] = new Task(state.info.get());
// NOTE: Since some tasks might have been terminated when the
// slave was down, the executor resources we capture here is an
// upper-bound. The actual resources needed (for live tasks) by
// the isolator will be calculated when the executor re-registers.
resources += state.info.get().resources();
// Read updates to get the latest state of the task.
foreach (const StatusUpdate& update, state.updates) {
updateTaskState(update.status());
// Terminate the task if it received a terminal update.
// We ignore duplicate terminal updates by checking if
// the task is present in launchedTasks.
// TODO(vinod): Revisit these semantics when we disallow duplicate
// terminal updates (e.g., when slave recovery is always enabled).
if (protobuf::isTerminalState(update.status().state()) &&
launchedTasks.contains(state.id)) {
terminateTask(state.id, update.status());
CHECK(update.has_uuid())
<< "Expecting updates without 'uuid' to have been rejected";
// If the terminal update has been acknowledged, remove it.
if (state.acks.contains(UUID::fromBytes(update.uuid()))) {
completeTask(state.id);
}
break;
}
}
}
void Executor::updateTaskState(const TaskStatus& status)
{
if (launchedTasks.contains(status.task_id())) {
Task* task = launchedTasks[status.task_id()];
// TODO(brenden): Consider wiping the `data` and `message` fields?
if (task->statuses_size() > 0 &&
task->statuses(task->statuses_size() - 1).state() == status.state()) {
task->mutable_statuses()->RemoveLast();
}
task->add_statuses()->CopyFrom(status);
task->set_state(status.state());
}
}
bool Executor::incompleteTasks()
{
return !queuedTasks.empty() ||
!launchedTasks.empty() ||
!terminatedTasks.empty();
}
bool Executor::isCommandExecutor() const
{
return commandExecutor;
}
void Executor::closeHttpConnection()
{
CHECK_SOME(http);
if (!http.get().close()) {
LOG(WARNING) << "Failed to close HTTP pipe for " << *this;
}
http = None();
}
std::ostream& operator<<(std::ostream& stream, const Executor& executor)
{
stream << "'" << executor.id << "' of framework " << executor.frameworkId;
if (executor.pid.isSome() && executor.pid.get()) {
stream << " at " << executor.pid.get();
} else if (executor.http.isSome() ||
(executor.slave->state == Slave::RECOVERING &&
executor.state == Executor::REGISTERING &&
executor.http.isNone() && executor.pid.isNone())) {
stream << " (via HTTP)";
}
return stream;
}
std::ostream& operator<<(std::ostream& stream, Slave::State state)
{
switch (state) {
case Slave::RECOVERING: return stream << "RECOVERING";
case Slave::DISCONNECTED: return stream << "DISCONNECTED";
case Slave::RUNNING: return stream << "RUNNING";
case Slave::TERMINATING: return stream << "TERMINATING";
default: return stream << "UNKNOWN";
}
}
std::ostream& operator<<(std::ostream& stream, Framework::State state)
{
switch (state) {
case Framework::RUNNING: return stream << "RUNNING";
case Framework::TERMINATING: return stream << "TERMINATING";
default: return stream << "UNKNOWN";
}
}
std::ostream& operator<<(std::ostream& stream, Executor::State state)
{
switch (state) {
case Executor::REGISTERING: return stream << "REGISTERING";
case Executor::RUNNING: return stream << "RUNNING";
case Executor::TERMINATING: return stream << "TERMINATING";
case Executor::TERMINATED: return stream << "TERMINATED";
default: return stream << "UNKNOWN";
}
}
} // namespace slave {
} // namespace internal {
} // namespace mesos {