blob: c828d99ca9909b904767cc9ea5671b462714f547 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <errno.h>
#include <signal.h>
#include <stdlib.h> // For random().
#include <algorithm>
#include <cmath>
#include <deque>
#include <iomanip>
#include <list>
#include <map>
#include <memory>
#include <set>
#include <sstream>
#include <string>
#include <utility>
#include <vector>
#include <glog/logging.h>
#include <mesos/type_utils.hpp>
#include <mesos/authentication/secret_generator.hpp>
#include <mesos/module/authenticatee.hpp>
#include <mesos/state/leveldb.hpp>
#include <mesos/state/in_memory.hpp>
#include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
#include <process/after.hpp>
#include <process/async.hpp>
#include <process/check.hpp>
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
#include <process/http.hpp>
#include <process/id.hpp>
#include <process/loop.hpp>
#include <process/reap.hpp>
#include <process/time.hpp>
#include <process/ssl/flags.hpp>
#include <stout/bytes.hpp>
#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/exit.hpp>
#include <stout/fs.hpp>
#include <stout/json.hpp>
#include <stout/lambda.hpp>
#include <stout/net.hpp>
#include <stout/numify.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/protobuf.hpp>
#include <stout/stringify.hpp>
#include <stout/strings.hpp>
#include <stout/try.hpp>
#include <stout/utils.hpp>
#include <stout/uuid.hpp>
#include <stout/os/realpath.hpp>
#include "authentication/cram_md5/authenticatee.hpp"
#include "common/authorization.hpp"
#include "common/build.hpp"
#include "common/protobuf_utils.hpp"
#include "common/resources_utils.hpp"
#include "common/status_utils.hpp"
#include "common/validation.hpp"
#include "credentials/credentials.hpp"
#include "hook/manager.hpp"
#ifdef __linux__
#include "linux/fs.hpp"
#endif // __linux__
#include "logging/logging.hpp"
#include "master/detector/standalone.hpp"
#include "module/manager.hpp"
#include "slave/compatibility.hpp"
#include "slave/constants.hpp"
#include "slave/flags.hpp"
#include "slave/paths.hpp"
#include "slave/slave.hpp"
#include "slave/state.pb.h"
#include "slave/task_status_update_manager.hpp"
#ifdef __WINDOWS__
// Used to install a Windows console ctrl handler.
// https://msdn.microsoft.com/en-us/library/windows/desktop/ms682066(v=vs.85).aspx
#include <slave/windows_ctrlhandler.hpp>
#else
// Used to install a handler for POSIX signal.
// http://pubs.opengroup.org/onlinepubs/009695399/functions/sigaction.html
#include <slave/posix_signalhandler.hpp>
#endif // __WINDOWS__
namespace http = process::http;
using google::protobuf::RepeatedPtrField;
using mesos::SecretGenerator;
using mesos::authorization::createSubject;
using mesos::authorization::ACCESS_SANDBOX;
using mesos::executor::Call;
using mesos::master::detector::MasterDetector;
using mesos::slave::ContainerConfig;
using mesos::slave::ContainerTermination;
using mesos::slave::QoSController;
using mesos::slave::QoSCorrection;
using mesos::slave::ResourceEstimator;
using std::deque;
using std::find;
using std::list;
using std::map;
using std::ostream;
using std::ostringstream;
using std::set;
using std::shared_ptr;
using std::string;
using std::vector;
using process::after;
using process::async;
using process::wait; // Necessary on some OS's to disambiguate.
using process::Break;
using process::Clock;
using process::Continue;
using process::ControlFlow;
using process::Failure;
using process::Future;
using process::Owned;
using process::PID;
using process::Promise;
using process::Time;
using process::UPID;
using process::http::authentication::Principal;
namespace mesos {
namespace internal {
namespace slave {
using namespace state;
// Forward declarations.
// Needed for logging task/task group.
static string taskOrTaskGroup(
const Option<TaskInfo>& task,
const Option<TaskGroupInfo>& taskGroup);
// Returns the command info for default executor.
static CommandInfo defaultExecutorCommandInfo(
const std::string& launcherDir,
const Option<std::string>& user);
// Sets the executor resource limit (the `limit` parameter) based on the resource
// passed in (the `value` parameter).
static void setLimit(Option<Value::Scalar>& limit, const Value::Scalar& value);
Slave::Slave(const string& id,
const slave::Flags& _flags,
MasterDetector* _detector,
Containerizer* _containerizer,
Files* _files,
GarbageCollector* _gc,
TaskStatusUpdateManager* _taskStatusUpdateManager,
ResourceEstimator* _resourceEstimator,
QoSController* _qosController,
SecretGenerator* _secretGenerator,
VolumeGidManager* _volumeGidManager,
PendingFutureTracker* _futureTracker,
#ifndef __WINDOWS__
const Option<process::network::unix::Socket>& _executorSocket,
#endif // __WINDOWS__
const Option<Authorizer*>& _authorizer)
: ProcessBase(id),
state(RECOVERING),
flags(_flags),
http(this),
capabilities(
_flags.agent_features.isNone()
? protobuf::slave::Capabilities(AGENT_CAPABILITIES())
: protobuf::slave::Capabilities(
_flags.agent_features->capabilities())),
completedFrameworks(MAX_COMPLETED_FRAMEWORKS),
detector(_detector),
containerizer(_containerizer),
files(_files),
metrics(*this),
gc(_gc),
taskStatusUpdateManager(_taskStatusUpdateManager),
masterPingTimeout(DEFAULT_MASTER_PING_TIMEOUT()),
metaDir(paths::getMetaRootDir(flags.work_dir)),
recoveryErrors(0),
credential(None()),
authenticatee(nullptr),
authenticating(None()),
authenticated(false),
reauthenticate(false),
executorDirectoryMaxAllowedAge(age(0)),
resourceEstimator(_resourceEstimator),
qosController(_qosController),
secretGenerator(_secretGenerator),
volumeGidManager(_volumeGidManager),
futureTracker(_futureTracker),
#ifndef __WINDOWS__
executorSocket(_executorSocket),
#endif // __WINDOWS__
authorizer(_authorizer),
resourceVersion(protobuf::createUUID()) {}
Slave::~Slave()
{
// TODO(benh): Shut down frameworks?
// TODO(benh): Shut down executors? The executor should get an "exited"
// event and initiate a shut down itself.
foreachvalue (Framework* framework, frameworks) {
delete framework;
}
delete authenticatee;
}
void Slave::signaled(int signal, int uid)
{
if (signal == SIGUSR1) {
Result<string> user = os::user(uid);
shutdown(
UPID(),
"Received SIGUSR1 signal" +
(user.isSome() ? " from user " + user.get() : ""));
}
}
void Slave::initialize()
{
LOG(INFO) << "Mesos agent started on " << string(self()).substr(5);
LOG(INFO) << "Flags at startup: " << flags;
if (self().address.ip.isLoopback()) {
LOG(WARNING) << "\n**************************************************\n"
<< "Agent bound to loopback interface!"
<< " Cannot communicate with remote master(s)."
<< " You might want to set '--ip' flag to a routable"
<< " IP address.\n"
<< "**************************************************";
}
if (flags.registration_backoff_factor > REGISTER_RETRY_INTERVAL_MAX) {
EXIT(EXIT_FAILURE)
<< "Invalid value '" << flags.registration_backoff_factor << "'"
<< " for --registration_backoff_factor:"
<< " Must be less than " << REGISTER_RETRY_INTERVAL_MAX;
}
authenticateeName = flags.authenticatee;
// Load credential for agent authentication with the master.
if (flags.credential.isSome()) {
Result<Credential> _credential =
credentials::readCredential(flags.credential.get());
if (_credential.isError()) {
EXIT(EXIT_FAILURE) << _credential.error() << " (see --credential flag)";
} else if (_credential.isNone()) {
EXIT(EXIT_FAILURE)
<< "Empty credential file '" << flags.credential.get() << "'"
<< " (see --credential flag)";
} else {
credential = _credential.get();
LOG(INFO) << "Agent using credential for: "
<< credential->principal();
}
}
Option<Credentials> httpCredentials;
if (flags.http_credentials.isSome()) {
Result<Credentials> credentials =
credentials::read(flags.http_credentials.get());
if (credentials.isError()) {
EXIT(EXIT_FAILURE)
<< credentials.error() << " (see --http_credentials flag)";
} else if (credentials.isNone()) {
EXIT(EXIT_FAILURE)
<< "Credentials file must contain at least one credential"
<< " (see --http_credentials flag)";
}
httpCredentials = credentials.get();
}
string httpAuthenticators;
if (flags.http_authenticators.isSome()) {
httpAuthenticators = flags.http_authenticators.get();
#ifdef USE_SSL_SOCKET
} else if (flags.authenticate_http_executors) {
httpAuthenticators =
string(DEFAULT_BASIC_HTTP_AUTHENTICATOR) + "," +
string(DEFAULT_JWT_HTTP_AUTHENTICATOR);
#endif // USE_SSL_SOCKET
} else {
httpAuthenticators = DEFAULT_BASIC_HTTP_AUTHENTICATOR;
}
Option<string> jwtSecretKey;
#ifdef USE_SSL_SOCKET
if (flags.jwt_secret_key.isSome()) {
Try<string> jwtSecretKey_ = os::read(flags.jwt_secret_key.get());
if (jwtSecretKey_.isError()) {
EXIT(EXIT_FAILURE) << "Failed to read the file specified by "
<< "--jwt_secret_key";
}
// TODO(greggomann): Factor the following code out into a common helper,
// since we also do this when loading credentials.
Try<os::Permissions> permissions =
os::permissions(flags.jwt_secret_key.get());
if (permissions.isError()) {
LOG(WARNING) << "Failed to stat jwt secret key file '"
<< flags.jwt_secret_key.get()
<< "': " << permissions.error();
} else if (permissions->others.rwx) {
LOG(WARNING) << "Permissions on executor secret key file '"
<< flags.jwt_secret_key.get()
<< "' are too open; it is recommended that your"
<< " key file is NOT accessible by others";
}
jwtSecretKey = jwtSecretKey_.get();
}
if (flags.authenticate_http_executors) {
if (flags.jwt_secret_key.isNone()) {
EXIT(EXIT_FAILURE) << "--jwt_secret_key must be specified when "
<< "--authenticate_http_executors is set to true";
}
Try<Nothing> result = initializeHttpAuthenticators(
EXECUTOR_HTTP_AUTHENTICATION_REALM,
strings::split(httpAuthenticators, ","),
httpCredentials,
jwtSecretKey);
if (result.isError()) {
EXIT(EXIT_FAILURE) << result.error();
}
}
#endif // USE_SSL_SOCKET
if (flags.authenticate_http_readonly) {
Try<Nothing> result = initializeHttpAuthenticators(
READONLY_HTTP_AUTHENTICATION_REALM,
strings::split(httpAuthenticators, ","),
httpCredentials,
jwtSecretKey);
if (result.isError()) {
EXIT(EXIT_FAILURE) << result.error();
}
}
if (flags.authenticate_http_readwrite) {
Try<Nothing> result = initializeHttpAuthenticators(
READWRITE_HTTP_AUTHENTICATION_REALM,
strings::split(httpAuthenticators, ","),
httpCredentials,
jwtSecretKey);
if (result.isError()) {
EXIT(EXIT_FAILURE) << result.error();
}
}
if ((flags.gc_disk_headroom < 0) || (flags.gc_disk_headroom > 1)) {
EXIT(EXIT_FAILURE)
<< "Invalid value '" << flags.gc_disk_headroom << "'"
<< " for --gc_disk_headroom. Must be between 0.0 and 1.0";
}
Try<Nothing> initialize =
resourceEstimator->initialize(defer(self(), &Self::usage));
if (initialize.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to initialize the resource estimator: " << initialize.error();
}
initialize = qosController->initialize(defer(self(), &Self::usage));
if (initialize.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to initialize the QoS Controller: " << initialize.error();
}
// Ensure slave work directory exists.
Try<Nothing> mkdir = os::mkdir(flags.work_dir);
if (mkdir.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to create agent work directory '" << flags.work_dir << "': "
<< mkdir.error();
}
// Create the DiskProfileAdaptor module and set it globally so
// any component that needs the module can share this instance.
Try<DiskProfileAdaptor*> _diskProfileAdaptor =
DiskProfileAdaptor::create(flags.disk_profile_adaptor);
if (_diskProfileAdaptor.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to create disk profile adaptor: "
<< _diskProfileAdaptor.error();
}
diskProfileAdaptor =
shared_ptr<DiskProfileAdaptor>(_diskProfileAdaptor.get());
DiskProfileAdaptor::setAdaptor(diskProfileAdaptor);
string scheme = "http";
#ifdef USE_SSL_SOCKET
if (process::network::openssl::flags().enabled) {
scheme = "https";
}
#endif
http::URL localResourceProviderURL(
scheme,
self().address.ip,
self().address.port,
self().id + "/api/v1/resource_provider");
Try<Owned<LocalResourceProviderDaemon>> _localResourceProviderDaemon =
LocalResourceProviderDaemon::create(
localResourceProviderURL,
flags,
secretGenerator);
if (_localResourceProviderDaemon.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to create local resource provider daemon: "
<< _localResourceProviderDaemon.error();
}
localResourceProviderDaemon = std::move(_localResourceProviderDaemon.get());
Try<Resources> resources = Containerizer::resources(flags);
if (resources.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to determine agent resources: " << resources.error();
}
// Ensure disk `source`s are accessible.
foreach (
const Resource& resource,
resources->filter([](const Resource& _resource) {
return _resource.has_disk() && _resource.disk().has_source();
})) {
const Resource::DiskInfo::Source& source = resource.disk().source();
switch (source.type()) {
case Resource::DiskInfo::Source::PATH: {
// For `PATH` sources we create them if they do not exist.
CHECK(source.has_path());
if (!source.path().has_root()) {
EXIT(EXIT_FAILURE)
<< "PATH disk root directory is not specified "
<< "'" << resource << "'";
}
Try<Nothing> mkdir = os::mkdir(source.path().root(), true);
if (mkdir.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to create DiskInfo path directory "
<< "'" << source.path().root() << "': " << mkdir.error();
}
break;
}
case Resource::DiskInfo::Source::MOUNT: {
CHECK(source.has_mount());
if (!source.mount().has_root()) {
EXIT(EXIT_FAILURE)
<< "MOUNT disk root directory is not specified "
<< "'" << resource << "'";
}
// For `MOUNT` sources we fail if they don't exist.
// On Linux we test the mount table for existence.
#ifdef __linux__
// Get the `realpath` of the `root` to verify it against the
// mount table entries.
// TODO(jmlvanre): Consider enforcing allowing only real paths
// as opposed to symlinks. This would prevent the ability for
// an operator to change the underlying data while the slave
// checkpointed `root` had the same value. We could also check
// the UUID of the underlying block device to catch this case.
Result<string> realpath = os::realpath(source.mount().root());
if (!realpath.isSome()) {
EXIT(EXIT_FAILURE)
<< "Failed to determine `realpath` for DiskInfo mount in resource '"
<< resource << "' with path '" << source.mount().root() << "': "
<< (realpath.isError() ? realpath.error() : "no such path");
}
// TODO(jmlvanre): Consider moving this out of the for loop.
Try<fs::MountTable> mountTable = fs::MountTable::read("/proc/mounts");
if (mountTable.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to open mount table to verify mounts: "
<< mountTable.error();
}
bool foundEntry = false;
foreach (const fs::MountTable::Entry& entry, mountTable->entries) {
if (entry.dir == realpath.get()) {
foundEntry = true;
break;
}
}
if (!foundEntry) {
EXIT(EXIT_FAILURE)
<< "Failed to find mount '" << realpath.get()
<< "' in /proc/mounts";
}
#else // __linux__
// On other platforms we test whether that provided `root` exists.
if (!os::exists(source.mount().root())) {
EXIT(EXIT_FAILURE)
<< "Failed to find mount point '" << source.mount().root() << "'";
}
#endif // __linux__
break;
}
case Resource::DiskInfo::Source::BLOCK:
case Resource::DiskInfo::Source::RAW:
case Resource::DiskInfo::Source::UNKNOWN: {
EXIT(EXIT_FAILURE)
<< "Unsupported 'DiskInfo.Source.Type' in '" << resource << "'";
}
}
}
Attributes attributes;
if (flags.attributes.isSome()) {
attributes = Attributes::parse(flags.attributes.get());
}
// Determine our hostname or use the hostname provided.
string hostname;
if (flags.hostname.isNone()) {
if (flags.hostname_lookup) {
Try<string> result = net::getHostname(self().address.ip);
if (result.isError()) {
EXIT(EXIT_FAILURE) << "Failed to get hostname: " << result.error();
}
hostname = result.get();
} else {
// We use the IP address for hostname if the user requested us
// NOT to look it up, and it wasn't explicitly set via --hostname:
hostname = stringify(self().address.ip);
}
} else {
hostname = flags.hostname.get();
}
// Initialize slave info.
info.set_hostname(hostname);
info.set_port(self().address.port);
info.mutable_resources()->CopyFrom(resources.get());
if (HookManager::hooksAvailable()) {
info.mutable_resources()->CopyFrom(
HookManager::slaveResourcesDecorator(info));
}
// Initialize `totalResources` with `info.resources`, checkpointed
// resources will be applied later during recovery.
totalResources = info.resources();
LOG(INFO) << "Agent resources: " << info.resources();
info.mutable_attributes()->CopyFrom(attributes);
if (HookManager::hooksAvailable()) {
info.mutable_attributes()->CopyFrom(
HookManager::slaveAttributesDecorator(info));
}
LOG(INFO) << "Agent attributes: " << info.attributes();
// Checkpointing of slaves is always enabled.
info.set_checkpoint(true);
if (flags.domain.isSome()) {
info.mutable_domain()->CopyFrom(flags.domain.get());
}
LOG(INFO) << "Agent hostname: " << info.hostname();
taskStatusUpdateManager->initialize(defer(self(), &Slave::forward, lambda::_1)
.operator std::function<void(StatusUpdate)>());
// We pause the status update managers so that they don't forward any updates
// while the agent is still recovering. They are unpaused/resumed when the
// agent (re-)registers with the master.
taskStatusUpdateManager->pause();
operationStatusUpdateManager.pause();
// Start disk monitoring.
// NOTE: We send a delayed message here instead of directly calling
// checkDiskUsage, to make disabling this feature easy (e.g by specifying
// a very large disk_watch_interval).
delay(flags.disk_watch_interval, self(), &Slave::checkDiskUsage);
// Start image store disk monitoring. Please note that image layers
// garbage collection is only enabled if the agent flag `--image_gc_config`
// is set.
// TODO(gilbert): Consider move the image auto GC logic to containerizers
// respectively. For now, it is only enabled for the Mesos Containerizer.
if (flags.image_gc_config.isSome() &&
flags.image_providers.isSome() &&
strings::contains(flags.containerizers, "mesos")) {
delay(
Nanoseconds(
flags.image_gc_config->image_disk_watch_interval().nanoseconds()),
self(),
&Slave::checkImageDiskUsage);
}
startTime = Clock::now();
// Install protobuf handlers.
install<SlaveRegisteredMessage>(
&Slave::registered,
&SlaveRegisteredMessage::slave_id,
&SlaveRegisteredMessage::connection);
install<SlaveReregisteredMessage>(
&Slave::reregistered,
&SlaveReregisteredMessage::slave_id,
&SlaveReregisteredMessage::reconciliations,
&SlaveReregisteredMessage::connection);
install<RunTaskMessage>(
&Slave::handleRunTaskMessage);
install<RunTaskGroupMessage>(
&Slave::handleRunTaskGroupMessage);
install<KillTaskMessage>(
&Slave::killTask);
install<ShutdownExecutorMessage>(
&Slave::shutdownExecutor,
&ShutdownExecutorMessage::framework_id,
&ShutdownExecutorMessage::executor_id);
install<ShutdownFrameworkMessage>(
&Slave::shutdownFramework,
&ShutdownFrameworkMessage::framework_id);
install<FrameworkToExecutorMessage>(
&Slave::schedulerMessage,
&FrameworkToExecutorMessage::slave_id,
&FrameworkToExecutorMessage::framework_id,
&FrameworkToExecutorMessage::executor_id,
&FrameworkToExecutorMessage::data);
install<UpdateFrameworkMessage>(
&Slave::updateFramework);
install<CheckpointResourcesMessage>(
&Slave::checkpointResourcesMessage,
&CheckpointResourcesMessage::resources);
install<ApplyOperationMessage>(
&Slave::applyOperation);
install<ReconcileOperationsMessage>(
&Slave::reconcileOperations);
install<StatusUpdateAcknowledgementMessage>(
&Slave::statusUpdateAcknowledgement,
&StatusUpdateAcknowledgementMessage::slave_id,
&StatusUpdateAcknowledgementMessage::framework_id,
&StatusUpdateAcknowledgementMessage::task_id,
&StatusUpdateAcknowledgementMessage::uuid);
install<AcknowledgeOperationStatusMessage>(
&Slave::operationStatusAcknowledgement);
install<RegisterExecutorMessage>(
&Slave::registerExecutor,
&RegisterExecutorMessage::framework_id,
&RegisterExecutorMessage::executor_id);
install<ReregisterExecutorMessage>(
&Slave::reregisterExecutor,
&ReregisterExecutorMessage::framework_id,
&ReregisterExecutorMessage::executor_id,
&ReregisterExecutorMessage::tasks,
&ReregisterExecutorMessage::updates);
install<StatusUpdateMessage>(
&Slave::statusUpdate,
&StatusUpdateMessage::update,
&StatusUpdateMessage::pid);
install<ExecutorToFrameworkMessage>(
&Slave::executorMessage,
&ExecutorToFrameworkMessage::slave_id,
&ExecutorToFrameworkMessage::framework_id,
&ExecutorToFrameworkMessage::executor_id,
&ExecutorToFrameworkMessage::data);
install<ShutdownMessage>(
&Slave::shutdown,
&ShutdownMessage::message);
install<DrainSlaveMessage>(&Slave::drain);
install<PingSlaveMessage>(
&Slave::ping,
&PingSlaveMessage::connected);
// Setup the '/api/v1' handler for streaming requests.
RouteOptions options;
options.requestStreaming = true;
route("/api/v1",
// TODO(benh): Is this authentication realm sufficient or do
// we need some kind of hybrid if we expect both executors
// and operators/tooling to use this endpoint?
READWRITE_HTTP_AUTHENTICATION_REALM,
Http::API_HELP(),
[this](const http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.api(request, principal);
},
options);
#ifndef __WINDOWS__
if (executorSocket.isSome()) {
// We use `http::Server` to manage the communication channel.
// Since `http::Server` currently doesn't offer support for
// authentication we then inject the request received by the
// server into normal agent rounting logic.
Try<http::Server> server = http::Server::create(
*executorSocket,
process::defer(
self(),
[this](const process::network::Socket&, http::Request request)
-> Future<http::Response> {
// Restrict access to only allow `/slave(N)/api/v1/executor`
// and `/slave(N)/api/v1`. Executors need to be able to
// access the first to subscribe and the latter to e.g.,
// launch containers or perform other operator API calls.
string selfPrefix = "/" + self().id;
if (request.url.path != selfPrefix + "/api/v1/executor" &&
request.url.path != selfPrefix + "/api/v1") {
LOG(INFO)
<< "Blocking request for " << request.url.path
<< " over executor socket";
return http::Forbidden();
}
// Create an `HttpEvent` with the needed information which we can
// be consumed by the agent. The event contains e.g., the
// requested path so the expected route `/api/v1/executor` is
// routed when consuming the event.
std::unique_ptr<Promise<http::Response>> promise(
new Promise<http::Response>());
Future<http::Response> response = promise->future();
process::HttpEvent event(
std::unique_ptr<http::Request>(new http::Request(request)),
std::move(promise));
std::move(event).consume(this);
return response;
}),
{
/* .scheme =*/process::http::Scheme::HTTP_UNIX,
/* .backlog =*/16384,
});
if (server.isError()) {
LOG(FATAL) << "Could not start listening on executor socket: "
<< server.error();
} else {
executorSocketServer = std::move(*server);
Future<Nothing> executorSocketServerTerminated =
executorSocketServer->run();
if (executorSocketServerTerminated.isFailed()) {
LOG(FATAL) << "Could not start listening on executor socket: "
<< executorSocketServerTerminated.failure();
}
}
}
#endif // __WINDOWS__
route("/api/v1/executor",
EXECUTOR_HTTP_AUTHENTICATION_REALM,
Http::EXECUTOR_HELP(),
[this](const http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.executor(request, principal);
});
route(
"/api/v1/resource_provider",
RESOURCE_PROVIDER_HTTP_AUTHENTICATION_REALM,
Http::RESOURCE_PROVIDER_HELP(),
[this](const http::Request& request, const Option<Principal>& principal)
-> Future<http::Response> {
logRequest(request);
if (resourceProviderManager.get() == nullptr) {
return http::ServiceUnavailable();
}
return resourceProviderManager->api(request, principal);
});
route("/state",
READONLY_HTTP_AUTHENTICATION_REALM,
Http::STATE_HELP(),
[this](const http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.state(request, principal)
.onReady([request](const process::http::Response& response) {
logResponse(request, response);
});
});
route("/flags",
READONLY_HTTP_AUTHENTICATION_REALM,
Http::FLAGS_HELP(),
[this](const http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.flags(request, principal);
});
route("/health",
Http::HEALTH_HELP(),
[this](const http::Request& request) {
return http.health(request);
});
route("/monitor/statistics",
READONLY_HTTP_AUTHENTICATION_REALM,
Http::STATISTICS_HELP(),
[this](const http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.statistics(request, principal);
});
route("/containers",
READONLY_HTTP_AUTHENTICATION_REALM,
Http::CONTAINERS_HELP(),
[this](const http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.containers(request, principal)
.onReady([request](const process::http::Response& response) {
logResponse(request, response);
});
});
route("/containerizer/debug",
READONLY_HTTP_AUTHENTICATION_REALM,
Http::CONTAINERIZER_DEBUG_HELP(),
[this](const http::Request& request,
const Option<Principal>& principal) {
logRequest(request);
return http.containerizerDebug(request, principal);
});
// TODO(tillt): Use generalized lambda capture once we adopt C++14.
Option<Authorizer*> _authorizer = authorizer;
auto authorize = [_authorizer](const Option<Principal>& principal) {
return authorization::authorizeLogAccess(_authorizer, principal);
};
// Expose the log file for the webui. Fall back to 'log_dir' if
// an explicit file was not specified.
if (flags.external_log_file.isSome()) {
files->attach(
flags.external_log_file.get(), AGENT_LOG_VIRTUAL_PATH, authorize)
.onAny(defer(self(),
&Self::fileAttached,
lambda::_1,
flags.external_log_file.get(),
AGENT_LOG_VIRTUAL_PATH));
} else if (flags.log_dir.isSome()) {
Try<string> log =
logging::getLogFile(logging::getLogSeverity(flags.logging_level));
if (log.isError()) {
LOG(ERROR) << "Agent log file cannot be found: " << log.error();
} else {
files->attach(log.get(), AGENT_LOG_VIRTUAL_PATH, authorize)
.onAny(defer(self(),
&Self::fileAttached,
lambda::_1,
log.get(),
AGENT_LOG_VIRTUAL_PATH));
}
}
// Check that the reconfiguration_policy flag is valid.
if (flags.reconfiguration_policy != "equal" &&
flags.reconfiguration_policy != "additive") {
EXIT(EXIT_FAILURE)
<< "Unknown option for 'reconfiguration_policy' flag "
<< flags.reconfiguration_policy << "."
<< " Please run the agent with '--help' to see the valid options.";
}
// Check that the recover flag is valid.
if (flags.recover != "reconnect" && flags.recover != "cleanup") {
EXIT(EXIT_FAILURE)
<< "Unknown option for 'recover' flag " << flags.recover << "."
<< " Please run the agent with '--help' to see the valid options";
}
auto signalHandler = defer(self(), &Slave::signaled, lambda::_1, lambda::_2)
.operator std::function<void(int, int)>();
#ifdef __WINDOWS__
if (!os::internal::installCtrlHandler(&signalHandler)) {
EXIT(EXIT_FAILURE)
<< "Failed to configure console handlers: " << WindowsError().message;
}
#else
if (os::internal::configureSignal(&signalHandler) < 0) {
EXIT(EXIT_FAILURE)
<< "Failed to configure signal handlers: " << os::strerror(errno);
}
#endif // __WINDOWS__
// Do recovery.
async(&state::recover, metaDir, flags.strict)
.then(defer(self(), &Slave::recover, lambda::_1))
.then(defer(self(), &Slave::_recover))
.onAny(defer(self(), &Slave::__recover, lambda::_1));
}
void Slave::finalize()
{
LOG(INFO) << "Agent terminating";
// NOTE: We use 'frameworks.keys()' here because 'shutdownFramework'
// can potentially remove a framework from 'frameworks'.
foreach (const FrameworkID& frameworkId, frameworks.keys()) {
// TODO(benh): Because a shut down isn't instantaneous (but has
// a shut down/kill phases) we might not actually propagate all
// the status updates appropriately here. Consider providing
// an alternative function which skips the shut down phase and
// simply does a kill (sending all status updates
// immediately). Of course, this still isn't sufficient
// because those status updates might get lost and we won't
// resend them unless we build that into the system.
// NOTE: We shut down the framework only if it has disabled
// checkpointing. This is because slave recovery tests terminate
// the slave to simulate slave restart.
if (!frameworks[frameworkId]->info.checkpoint()) {
shutdownFramework(UPID(), frameworkId);
}
}
// Explicitly tear down the resource provider manager to ensure that the
// wrapped process is terminated and releases the underlying storage.
resourceProviderManager.reset();
}
void Slave::shutdown(const UPID& from, const string& message)
{
if (from && master != from) {
LOG(WARNING) << "Ignoring shutdown message from " << from
<< " because it is not from the registered master: "
<< (master.isSome() ? stringify(master.get()) : "None");
return;
}
if (from) {
LOG(INFO) << "Agent asked to shut down by " << from
<< (message.empty() ? "" : " because '" + message + "'");
} else if (info.has_id()) {
if (message.empty()) {
LOG(INFO) << "Unregistering and shutting down";
} else {
LOG(INFO) << message << "; unregistering and shutting down";
}
UnregisterSlaveMessage message_;
message_.mutable_slave_id()->MergeFrom(info.id());
send(master.get(), message_);
} else {
if (message.empty()) {
LOG(INFO) << "Shutting down";
} else {
LOG(INFO) << message << "; shutting down";
}
}
state = TERMINATING;
if (frameworks.empty()) { // Terminate slave if there are no frameworks.
terminate(self());
} else {
// NOTE: The slave will terminate after all the executors have
// terminated.
// NOTE: We use 'frameworks.keys()' here because 'shutdownFramework'
// can potentially remove a framework from 'frameworks'.
foreach (const FrameworkID& frameworkId, frameworks.keys()) {
shutdownFramework(from, frameworkId);
}
}
}
void Slave::drain(
const UPID& from,
DrainSlaveMessage&& drainSlaveMessage)
{
if (operations.empty() && frameworks.empty()) {
LOG(INFO)
<< "Received DrainConfig " << drainSlaveMessage.config()
<< (drainConfig.isSome()
? "; previously stored DrainConfig " + stringify(*drainConfig)
: "")
<< "; agent has no stored frameworks, tasks, or operations,"
" so draining is already complete";
return;
}
hashmap<FrameworkID, hashset<TaskID>> pendingTaskIds;
foreachvalue (Framework* framework, frameworks) {
foreachvalue (const auto& taskMap, framework->pendingTasks) {
pendingTaskIds[framework->id()] = taskMap.keys();
}
}
hashmap<FrameworkID, hashset<TaskID>> queuedTaskIds;
foreachvalue (Framework* framework, frameworks) {
foreachvalue (Executor* executor, framework->executors) {
foreachkey (const TaskID& taskId, executor->queuedTasks) {
queuedTaskIds[framework->id()].insert(taskId);
}
}
}
hashmap<FrameworkID, hashset<TaskID>> launchedTaskIds;
foreachvalue (Framework* framework, frameworks) {
foreachvalue (Executor* executor, framework->executors) {
foreachkey (const TaskID& taskId, executor->launchedTasks) {
launchedTaskIds[framework->id()].insert(taskId);
}
}
}
LOG(INFO)
<< "Initiating drain with DrainConfig " << drainSlaveMessage.config()
<< (drainConfig.isSome()
? "; overwriting previous DrainConfig " + stringify(*drainConfig)
: "")
<< "; agent has (pending tasks, queued tasks, launched tasks, operations)"
<< " == ("
<< stringify(pendingTaskIds) << ", "
<< stringify(queuedTaskIds) << ", "
<< stringify(launchedTaskIds) << ", "
<< stringify(operations.keys()) << ")";
CHECK_SOME(state::checkpoint(
paths::getDrainConfigPath(metaDir, info.id()),
drainSlaveMessage.config()))
<< "Failed to checkpoint DrainConfig";
drainConfig = drainSlaveMessage.config();
estimatedDrainStartTime = Clock::now();
const Option<DurationInfo> maxGracePeriod =
drainConfig->has_max_grace_period()
? drainConfig->max_grace_period()
: Option<DurationInfo>::none();
auto calculateKillPolicy =
[&](const Option<KillPolicy>& killPolicy) -> Option<KillPolicy> {
if (maxGracePeriod.isNone()) {
return None();
}
KillPolicy killPolicyOverride;
killPolicyOverride.mutable_grace_period()->CopyFrom(maxGracePeriod.get());
// Task kill policy is not set or unknown.
if (killPolicy.isNone() || !killPolicy->has_grace_period()) {
return killPolicyOverride;
}
// Task kill policy is greater than the override.
if (maxGracePeriod.get() < killPolicy->grace_period()) {
return killPolicyOverride;
}
return None();
};
// Frameworks may be removed within `kill()` or `killPendingTask()` below,
// so we must copy them and their members before looping.
foreachvalue (Framework* framework, utils::copy(frameworks)) {
typedef hashmap<TaskID, TaskInfo> TaskMap;
foreachvalue (const TaskMap& tasks, utils::copy(framework->pendingTasks)) {
foreachvalue (const TaskInfo& task, tasks) {
killPendingTask(framework->id(), framework, task.task_id());
}
}
foreachvalue (Executor* executor, utils::copy(framework->executors)) {
foreachvalue (Task* task, executor->launchedTasks) {
kill(framework->id(),
framework,
executor,
task->task_id(),
calculateKillPolicy(
task->has_kill_policy()
? task->kill_policy()
: Option<KillPolicy>::none()));
}
foreachvalue (const TaskInfo& task, utils::copy(executor->queuedTasks)) {
kill(framework->id(),
framework,
executor,
task.task_id(),
calculateKillPolicy(
task.has_kill_policy()
? task.kill_policy()
: Option<KillPolicy>::none()));
}
}
}
}
void Slave::fileAttached(
const Future<Nothing>& result,
const string& path,
const string& virtualPath)
{
if (result.isReady()) {
VLOG(1) << "Successfully attached '" << path << "'"
<< " to virtual path '" << virtualPath << "'";
} else {
LOG(ERROR) << "Failed to attach '" << path << "'"
<< " to virtual path '" << virtualPath << "': "
<< (result.isFailed() ? result.failure() : "discarded");
}
}
// TODO(vinod/bmahler): Get rid of this helper.
Nothing Slave::detachFile(const string& path)
{
files->detach(path);
return Nothing();
}
void Slave::attachTaskVolumeDirectory(
const ExecutorInfo& executorInfo,
const ContainerID& executorContainerId,
const Task& task)
{
CHECK(executorInfo.has_type() &&
executorInfo.type() == ExecutorInfo::DEFAULT);
CHECK_EQ(task.executor_id(), executorInfo.executor_id());
// This is the case that the task has disk resources specified.
foreach (const Resource& resource, task.resources()) {
// Ignore if there are no disk resources or if the
// disk resources did not specify a volume mapping.
if (!resource.has_disk() || !resource.disk().has_volume()) {
continue;
}
const Volume& volume = resource.disk().volume();
const string executorRunPath = paths::getExecutorRunPath(
flags.work_dir,
info.id(),
task.framework_id(),
task.executor_id(),
executorContainerId);
const string executorDirectoryPath =
path::join(executorRunPath, volume.container_path());
const string taskPath = paths::getTaskPath(
flags.work_dir,
info.id(),
task.framework_id(),
task.executor_id(),
executorContainerId,
task.task_id());
const string taskDirectoryPath =
path::join(taskPath, volume.container_path());
files->attach(executorDirectoryPath, taskDirectoryPath)
.onAny(defer(
self(),
&Self::fileAttached,
lambda::_1,
executorDirectoryPath,
taskDirectoryPath));
}
// This is the case that the executor has disk resources specified
// and the task's ContainerInfo has a `SANDBOX_PATH` volume with type
// `PARENT` to share the executor's disk volume.
hashset<string> executorContainerPaths;
foreach (const Resource& resource, executorInfo.resources()) {
// Ignore if there are no disk resources or if the
// disk resources did not specify a volume mapping.
if (!resource.has_disk() || !resource.disk().has_volume()) {
continue;
}
const Volume& volume = resource.disk().volume();
executorContainerPaths.insert(volume.container_path());
}
if (executorContainerPaths.empty()) {
return;
}
if (task.has_container()) {
foreach (const Volume& volume, task.container().volumes()) {
if (!volume.has_source() ||
volume.source().type() != Volume::Source::SANDBOX_PATH) {
continue;
}
CHECK(volume.source().has_sandbox_path());
const Volume::Source::SandboxPath& sandboxPath =
volume.source().sandbox_path();
if (sandboxPath.type() != Volume::Source::SandboxPath::PARENT) {
continue;
}
if (!executorContainerPaths.contains(sandboxPath.path())) {
continue;
}
const string executorRunPath = paths::getExecutorRunPath(
flags.work_dir,
info.id(),
task.framework_id(),
task.executor_id(),
executorContainerId);
const string executorDirectoryPath =
path::join(executorRunPath, sandboxPath.path());
const string taskPath = paths::getTaskPath(
flags.work_dir,
info.id(),
task.framework_id(),
task.executor_id(),
executorContainerId,
task.task_id());
const string taskDirectoryPath =
path::join(taskPath, volume.container_path());
files->attach(executorDirectoryPath, taskDirectoryPath)
.onAny(defer(
self(),
&Self::fileAttached,
lambda::_1,
executorDirectoryPath,
taskDirectoryPath));
}
}
}
void Slave::detachTaskVolumeDirectories(
const ExecutorInfo& executorInfo,
const ContainerID& executorContainerId,
const vector<Task>& tasks)
{
// NOTE: If the executor is not a default executor, this function will
// still be called but with an empty list of tasks.
CHECK(tasks.empty() ||
(executorInfo.has_type() &&
executorInfo.type() == ExecutorInfo::DEFAULT));
hashset<string> executorContainerPaths;
foreach (const Resource& resource, executorInfo.resources()) {
// Ignore if there are no disk resources or if the
// disk resources did not specify a volume mapping.
if (!resource.has_disk() || !resource.disk().has_volume()) {
continue;
}
const Volume& volume = resource.disk().volume();
executorContainerPaths.insert(volume.container_path());
}
foreach (const Task& task, tasks) {
CHECK_EQ(task.executor_id(), executorInfo.executor_id());
// This is the case that the task has disk resources specified.
foreach (const Resource& resource, task.resources()) {
// Ignore if there are no disk resources or if the
// disk resources did not specify a volume mapping.
if (!resource.has_disk() || !resource.disk().has_volume()) {
continue;
}
const Volume& volume = resource.disk().volume();
const string taskPath = paths::getTaskPath(
flags.work_dir,
info.id(),
task.framework_id(),
task.executor_id(),
executorContainerId,
task.task_id());
const string taskDirectoryPath =
path::join(taskPath, volume.container_path());
files->detach(taskDirectoryPath);
}
if (executorContainerPaths.empty()) {
continue;
}
// This is the case that the executor has disk resources specified
// and the task's ContainerInfo has a `SANDBOX_PATH` volume with type
// `PARENT` to share the executor's disk volume.
if (task.has_container()) {
foreach (const Volume& volume, task.container().volumes()) {
if (!volume.has_source() ||
volume.source().type() != Volume::Source::SANDBOX_PATH) {
continue;
}
CHECK(volume.source().has_sandbox_path());
const Volume::Source::SandboxPath& sandboxPath =
volume.source().sandbox_path();
if (sandboxPath.type() != Volume::Source::SandboxPath::PARENT) {
continue;
}
if (!executorContainerPaths.contains(sandboxPath.path())) {
continue;
}
const string taskPath = paths::getTaskPath(
flags.work_dir,
info.id(),
task.framework_id(),
task.executor_id(),
executorContainerId,
task.task_id());
const string taskDirectoryPath =
path::join(taskPath, volume.container_path());
files->detach(taskDirectoryPath);
}
}
}
}
void Slave::detected(const Future<Option<MasterInfo>>& _master)
{
CHECK(state == DISCONNECTED ||
state == RUNNING ||
state == TERMINATING) << state;
if (state != TERMINATING) {
state = DISCONNECTED;
}
// Pause the status updates.
taskStatusUpdateManager->pause();
operationStatusUpdateManager.pause();
if (_master.isFailed()) {
EXIT(EXIT_FAILURE) << "Failed to detect a master: " << _master.failure();
}
Option<MasterInfo> latest;
if (_master.isDiscarded()) {
LOG(INFO) << "Re-detecting master";
latest = None();
master = None();
} else if (_master->isNone()) {
LOG(INFO) << "Lost leading master";
latest = None();
master = None();
} else {
latest = _master.get();
master = UPID(latest->pid());
LOG(INFO) << "New master detected at " << master.get();
// Cancel the pending registration timer to avoid spurious attempts
// at reregistration. `Clock::cancel` is idempotent, so this call
// is safe even if no timer is active or pending.
Clock::cancel(agentRegistrationTimer);
if (state == TERMINATING) {
LOG(INFO) << "Skipping registration because agent is terminating";
return;
}
if (requiredMasterCapabilities.agentUpdate) {
protobuf::master::Capabilities masterCapabilities(
latest->capabilities());
if (!masterCapabilities.agentUpdate) {
EXIT(EXIT_FAILURE) <<
"Agent state changed on restart, but the detected master lacks the "
"AGENT_UPDATE capability. Refusing to connect.";
return;
}
if (dynamic_cast<mesos::master::detector::StandaloneMasterDetector*>(
detector)) {
LOG(WARNING) <<
"The AGENT_UPDATE master capability is required, "
"but the StandaloneMasterDetector does not have the ability to read "
"master capabilities.";
}
}
// Wait for a random amount of time before authentication or
// registration.
//
// TODO(mzhu): Specialize this for authentication.
Duration duration =
flags.registration_backoff_factor * ((double) os::random() / RAND_MAX);
if (credential.isSome()) {
// Authenticate with the master.
// TODO(vinod): Consider adding an "AUTHENTICATED" state to the
// slave instead of "authenticate" variable.
Duration maxTimeout = flags.authentication_timeout_min +
flags.authentication_backoff_factor * 2;
delay(
duration,
self(),
&Slave::authenticate,
flags.authentication_timeout_min,
std::min(maxTimeout, flags.authentication_timeout_max));
} else {
// Proceed with registration without authentication.
LOG(INFO) << "No credentials provided."
<< " Attempting to register without authentication";
delay(duration,
self(),
&Slave::doReliableRegistration,
flags.registration_backoff_factor * 2); // Backoff.
}
}
// Keep detecting masters.
LOG(INFO) << "Detecting new master";
detection = detector->detect(latest)
.onAny(defer(self(), &Slave::detected, lambda::_1));
}
void Slave::authenticate(Duration minTimeout, Duration maxTimeout)
{
authenticated = false;
if (master.isNone()) {
return;
}
if (authenticating.isSome()) {
// Authentication is in progress. Try to cancel it.
// Note that it is possible that 'authenticating' is ready
// and the dispatch to '_authenticate' is enqueued when we
// are here, making the 'discard' here a no-op. This is ok
// because we set 'reauthenticate' here which enforces a retry
// in '_authenticate'.
Future<bool> authenticating_ = authenticating.get();
authenticating_.discard();
reauthenticate = true;
return;
}
LOG(INFO) << "Authenticating with master " << master.get();
// Ensure there is a link to the master before we start
// communicating with it.
link(master.get());
CHECK(authenticatee == nullptr);
if (authenticateeName == DEFAULT_AUTHENTICATEE) {
LOG(INFO) << "Using default CRAM-MD5 authenticatee";
authenticatee = new cram_md5::CRAMMD5Authenticatee();
}
if (authenticatee == nullptr) {
Try<Authenticatee*> module =
modules::ModuleManager::create<Authenticatee>(authenticateeName);
if (module.isError()) {
EXIT(EXIT_FAILURE)
<< "Could not create authenticatee module '"
<< authenticateeName << "': " << module.error();
}
LOG(INFO) << "Using '" << authenticateeName << "' authenticatee";
authenticatee = module.get();
}
CHECK_SOME(credential);
// We pick a random duration between `minTimeout` and `maxTimeout`.
Duration timeout =
minTimeout + (maxTimeout - minTimeout) * ((double)os::random() / RAND_MAX);
authenticating =
authenticatee->authenticate(master.get(), self(), credential.get())
.onAny(defer(self(), &Self::_authenticate, minTimeout, maxTimeout))
.after(timeout, [](Future<bool> future) {
// NOTE: Discarded future results in a retry in '_authenticate()'.
// This is a no-op if the future is already ready.
if (future.discard()) {
LOG(WARNING) << "Authentication timed out";
}
return future;
});
}
void Slave::_authenticate(
Duration currentMinTimeout, Duration currentMaxTimeout)
{
delete CHECK_NOTNULL(authenticatee);
authenticatee = nullptr;
CHECK_SOME(authenticating);
const Future<bool>& future = authenticating.get();
if (master.isNone()) {
LOG(INFO) << "Ignoring _authenticate because the master is lost";
authenticating = None();
// Set it to false because we do not want further retries until
// a new master is detected.
// We obviously do not need to reauthenticate either even if
// 'reauthenticate' is currently true because the master is
// lost.
reauthenticate = false;
return;
}
if (reauthenticate || !future.isReady()) {
LOG(WARNING)
<< "Failed to authenticate with master " << master.get() << ": "
<< (reauthenticate ? "master changed" :
(future.isFailed() ? future.failure() : "future discarded"));
authenticating = None();
reauthenticate = false;
// Grow the timeout range using exponential backoff:
//
// [min, min + factor * 2^0]
// [min, min + factor * 2^1]
// ...
// [min, min + factor * 2^N]
// ...
// [min, max] // Stop at max.
Duration maxTimeout =
currentMinTimeout + (currentMaxTimeout - currentMinTimeout) * 2;
authenticate(
currentMinTimeout,
std::min(maxTimeout, flags.authentication_timeout_max));
return;
}
if (!future.get()) {
// For refused authentication, we exit instead of doing a shutdown
// to keep possibly active executors running.
EXIT(EXIT_FAILURE)
<< "Master " << master.get() << " refused authentication";
}
LOG(INFO) << "Successfully authenticated with master " << master.get();
authenticated = true;
authenticating = None();
// Proceed with registration.
doReliableRegistration(flags.registration_backoff_factor * 2);
}
void Slave::registered(
const UPID& from,
const SlaveID& slaveId,
const MasterSlaveConnection& connection)
{
if (master != from) {
LOG(WARNING) << "Ignoring registration message from " << from
<< " because it is not the expected master: "
<< (master.isSome() ? stringify(master.get()) : "None");
return;
}
CHECK_SOME(master);
if (connection.has_total_ping_timeout_seconds()) {
masterPingTimeout =
Seconds(static_cast<int64_t>(connection.total_ping_timeout_seconds()));
} else {
masterPingTimeout = DEFAULT_MASTER_PING_TIMEOUT();
}
switch (state) {
case DISCONNECTED: {
LOG(INFO) << "Registered with master " << master.get()
<< "; given agent ID " << slaveId;
state = RUNNING;
// Cancel the pending registration timer to avoid spurious attempts
// at reregistration. `Clock::cancel` is idempotent, so this call
// is safe even if no timer is active or pending.
Clock::cancel(agentRegistrationTimer);
taskStatusUpdateManager->resume(); // Resume status updates.
info.mutable_id()->CopyFrom(slaveId); // Store the slave id.
// Create the slave meta directory.
paths::createSlaveDirectory(metaDir, slaveId);
// Initialize and resume the operation status update manager.
//
// NOTE: There is no need to recover the operation status update manager,
// because its streams are checkpointed within the slave meta directory
// which was just created.
operationStatusUpdateManager.initialize(
defer(self(), &Self::sendOperationStatusUpdate, lambda::_1),
std::bind(
&slave::paths::getSlaveOperationUpdatesPath,
metaDir,
info.id(),
lambda::_1));
operationStatusUpdateManager.resume();
// Checkpoint slave info.
const string path = paths::getSlaveInfoPath(metaDir, slaveId);
VLOG(1) << "Checkpointing SlaveInfo to '" << path << "'";
CHECK_SOME(state::checkpoint(path, info));
// If we registered with this agent ID for the first time initialize
// the resource provider manager with it; if the manager was already
// initialized with a recovered agent ID this is a no-op.
initializeResourceProviderManager(flags, info.id());
// We start the local resource providers daemon once the agent is
// running, so the resource providers can use the agent API.
localResourceProviderDaemon->start(info.id());
// Setup a timer so that the agent attempts to reregister if it
// doesn't receive a ping from the master for an extended period
// of time. This needs to be done once registered, in case we
// never receive an initial ping.
Clock::cancel(pingTimer);
pingTimer = delay(
masterPingTimeout,
self(),
&Slave::pingTimeout,
detection);
break;
}
case RUNNING:
// Already registered!
if (info.id() != slaveId) {
EXIT(EXIT_FAILURE)
<< "Registered but got wrong id: " << slaveId
<< " (expected: " << info.id() << "). Committing suicide";
}
LOG(WARNING) << "Already registered with master " << master.get();
break;
case TERMINATING:
LOG(WARNING) << "Ignoring registration because agent is terminating";
break;
case RECOVERING:
default:
LOG(FATAL) << "Unexpected agent state " << state;
break;
}
// If this agent can support resource providers or has had any oversubscribed
// resources set, send an `UpdateSlaveMessage` to the master to inform it of a
// possible changes between completion of recovery and agent registration.
if (capabilities.resourceProvider || oversubscribedResources.isSome()) {
UpdateSlaveMessage message = generateUpdateSlaveMessage();
LOG(INFO) << "Forwarding agent update " << JSON::protobuf(message);
send(master.get(), message);
}
}
void Slave::reregistered(
const UPID& from,
const SlaveID& slaveId,
const vector<ReconcileTasksMessage>& reconciliations,
const MasterSlaveConnection& connection)
{
if (master != from) {
LOG(WARNING) << "Ignoring re-registration message from " << from
<< " because it is not the expected master: "
<< (master.isSome() ? stringify(master.get()) : "None");
return;
}
CHECK_SOME(master);
if (info.id() != slaveId) {
EXIT(EXIT_FAILURE)
<< "Re-registered but got wrong id: " << slaveId
<< " (expected: " << info.id() << "). Committing suicide";
}
if (connection.has_total_ping_timeout_seconds()) {
masterPingTimeout =
Seconds(static_cast<int64_t>(connection.total_ping_timeout_seconds()));
} else {
masterPingTimeout = DEFAULT_MASTER_PING_TIMEOUT();
}
switch (state) {
case DISCONNECTED:
LOG(INFO) << "Re-registered with master " << master.get();
state = RUNNING;
taskStatusUpdateManager->resume(); // Resume status updates.
operationStatusUpdateManager.resume();
// We start the local resource providers daemon once the agent is
// running, so the resource providers can use the agent API.
localResourceProviderDaemon->start(info.id());
// Setup a timer so that the agent attempts to reregister if it
// doesn't receive a ping from the master for an extended period
// of time. This needs to be done once reregistered, in case we
// never receive an initial ping.
Clock::cancel(pingTimer);
pingTimer = delay(
masterPingTimeout,
self(),
&Slave::pingTimeout,
detection);
break;
case RUNNING:
LOG(WARNING) << "Already reregistered with master " << master.get();
break;
case TERMINATING:
LOG(WARNING) << "Ignoring re-registration because agent is terminating";
return;
case RECOVERING:
// It's possible to receive a message intended for the previous
// run of the slave here. Short term we can leave this as is and
// crash in this case. Ideally responses can be tied to a
// particular run of the slave, see:
// https://issues.apache.org/jira/browse/MESOS-676
// https://issues.apache.org/jira/browse/MESOS-677
default:
LOG(FATAL) << "Unexpected agent state " << state;
return;
}
// If this agent can support resource providers or has had any oversubscribed
// resources set, send an `UpdateSlaveMessage` to the master to inform it of a
// possible changes between completion of recovery and agent registration.
if (capabilities.resourceProvider || oversubscribedResources.isSome()) {
UpdateSlaveMessage message = generateUpdateSlaveMessage();
LOG(INFO) << "Forwarding agent update " << JSON::protobuf(message);
send(master.get(), message);
}
// Reconcile any tasks per the master's request.
foreach (const ReconcileTasksMessage& reconcile, reconciliations) {
Framework* framework = getFramework(reconcile.framework_id());
foreach (const TaskStatus& status, reconcile.statuses()) {
const TaskID& taskId = status.task_id();
bool known = false;
if (framework != nullptr) {
known = framework->hasTask(taskId);
}
// Send a terminal status update for each task that is known to
// the master but not known to the agent. This ensures that the
// master will cleanup any state associated with the task, which
// is not running. We send TASK_DROPPED to partition-aware
// frameworks; frameworks that are not partition-aware are sent
// TASK_LOST for backward compatibility.
//
// If the task is known to the agent, we don't need to send a
// status update to the master: because the master already knows
// about the task, any subsequent status updates will be
// propagated correctly.
if (!known) {
// NOTE: The `framework` field of the `ReconcileTasksMessage`
// is only set by masters running Mesos 1.1.0 or later. If the
// field is unset, we assume the framework is not partition-aware.
mesos::TaskState taskState = TASK_LOST;
if (reconcile.has_framework() &&
protobuf::frameworkHasCapability(
reconcile.framework(),
FrameworkInfo::Capability::PARTITION_AWARE)) {
taskState = TASK_DROPPED;
}
LOG(WARNING) << "Agent reconciling task " << taskId
<< " of framework " << reconcile.framework_id()
<< " in state " << taskState
<< ": task unknown to the agent";
const StatusUpdate update = protobuf::createStatusUpdate(
reconcile.framework_id(),
info.id(),
taskId,
taskState,
TaskStatus::SOURCE_SLAVE,
id::UUID::random(),
"Reconciliation: task unknown to the agent",
TaskStatus::REASON_RECONCILIATION);
// NOTE: We can't use statusUpdate() here because it drops
// updates for unknown frameworks.
taskStatusUpdateManager->update(update, info.id())
.onAny(defer(self(),
&Slave::___statusUpdate,
lambda::_1,
update,
UPID()));
}
}
}
}
void Slave::doReliableRegistration(Duration maxBackoff)
{
if (master.isNone()) {
LOG(INFO) << "Skipping registration because no master present";
return;
}
if (credential.isSome() && !authenticated) {
LOG(INFO) << "Skipping registration because not authenticated";
return;
}
if (state == RUNNING) { // Slave (re-)registered with the master.
return;
}
if (state == TERMINATING) {
LOG(INFO) << "Skipping registration because agent is terminating";
return;
}
CHECK(state == DISCONNECTED) << state;
CHECK_NE("cleanup", flags.recover);
// Ensure there is a link to the master before we start
// communicating with it. We want to link after the initial
// registration backoff in order to avoid all of the agents
// establishing connections with the master at once.
// See MESOS-5330.
link(master.get());
if (!info.has_id()) {
// Registering for the first time.
RegisterSlaveMessage message;
message.set_version(MESOS_VERSION);
message.mutable_slave()->CopyFrom(info);
message.mutable_agent_capabilities()->CopyFrom(
capabilities.toRepeatedPtrField());
// Include checkpointed resources.
message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources);
message.mutable_resource_version_uuid()->CopyFrom(resourceVersion);
// If the `Try` from `downgradeResources` returns an `Error`, we currently
// continue to send the resources to the master in a partially downgraded
// state. This implies that an agent with refined reservations cannot work
// with versions of master before reservation refinement support, which was
// introduced in 1.4.0.
//
// TODO(mpark): Do something smarter with the result once something
// like a master capability is introduced.
downgradeResources(&message);
send(master.get(), message);
} else {
// Re-registering, so send tasks running.
ReregisterSlaveMessage message;
message.set_version(MESOS_VERSION);
message.mutable_agent_capabilities()->CopyFrom(
capabilities.toRepeatedPtrField());
// Include checkpointed resources.
message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources);
message.mutable_resource_version_uuid()->CopyFrom(resourceVersion);
message.mutable_slave()->CopyFrom(info);
foreachvalue (Framework* framework, frameworks) {
message.add_frameworks()->CopyFrom(framework->info);
// TODO(bmahler): We need to send the executors for these
// pending tasks, and we need to send exited events if they
// cannot be launched, see MESOS-1715, MESOS-1720, MESOS-1800.
typedef hashmap<TaskID, TaskInfo> TaskMap;
foreachvalue (const TaskMap& tasks, framework->pendingTasks) {
foreachvalue (const TaskInfo& task, tasks) {
message.add_tasks()->CopyFrom(protobuf::createTask(
task, TASK_STAGING, framework->id()));
}
}
foreachvalue (Executor* executor, framework->executors) {
// Add launched, terminated, and queued tasks.
// Note that terminated executors will only have terminated
// unacknowledged tasks.
// Note that for each task the latest state and status update
// state (if any) is also included.
foreachvalue (Task* task, executor->launchedTasks) {
message.add_tasks()->CopyFrom(*task);
}
foreachvalue (Task* task, executor->terminatedTasks) {
message.add_tasks()->CopyFrom(*task);
}
foreachvalue (const TaskInfo& task, executor->queuedTasks) {
message.add_tasks()->CopyFrom(protobuf::createTask(
task, TASK_STAGING, framework->id()));
}
// Do not reregister with Command (or Docker) Executors
// because the master doesn't store them; they are generated
// by the slave.
if (!executor->isGeneratedForCommandTask()) {
// Ignore terminated executors because they do not consume
// any resources.
if (executor->state != Executor::TERMINATED) {
ExecutorInfo* executorInfo = message.add_executor_infos();
executorInfo->MergeFrom(executor->info);
// Scheduler Driver will ensure the framework id is set in
// ExecutorInfo, effectively making it a required field.
CHECK(executorInfo->has_framework_id());
}
}
}
}
// Add completed frameworks.
foreachvalue (const Owned<Framework>& completedFramework,
completedFrameworks) {
VLOG(1) << "Reregistering completed framework "
<< completedFramework->id();
Archive::Framework* completedFramework_ =
message.add_completed_frameworks();
completedFramework_->mutable_framework_info()->CopyFrom(
completedFramework->info);
if (completedFramework->pid.isSome()) {
completedFramework_->set_pid(completedFramework->pid.get());
}
foreach (const Owned<Executor>& executor,
completedFramework->completedExecutors) {
VLOG(2) << "Reregistering completed executor '" << executor->id
<< "' with " << executor->terminatedTasks.size()
<< " terminated tasks, " << executor->completedTasks.size()
<< " completed tasks";
foreachvalue (const Task* task, executor->terminatedTasks) {
VLOG(2) << "Reregistering terminated task " << task->task_id();
completedFramework_->add_tasks()->CopyFrom(*task);
}
foreach (const shared_ptr<Task>& task, executor->completedTasks) {
VLOG(2) << "Reregistering completed task " << task->task_id();
completedFramework_->add_tasks()->CopyFrom(*task);
}
}
}
// If the `Try` from `downgradeResources` returns an `Error`, we currently
// continue to send the resources to the master in a partially downgraded
// state. This implies that an agent with refined reservations cannot work
// with versions of master before reservation refinement support, which was
// introduced in 1.4.0.
//
// TODO(mpark): Do something smarter with the result once something
// like a master capability is introduced.
downgradeResources(&message);
CHECK_SOME(master);
send(master.get(), message);
}
// Bound the maximum backoff by 'REGISTER_RETRY_INTERVAL_MAX'.
maxBackoff = std::min(maxBackoff, REGISTER_RETRY_INTERVAL_MAX);
// Determine the delay for next attempt by picking a random
// duration between 0 and 'maxBackoff'.
Duration delay = maxBackoff * ((double) os::random() / RAND_MAX);
VLOG(1) << "Will retry registration in " << delay << " if necessary";
// Backoff.
agentRegistrationTimer = process::delay(
delay,
self(),
&Slave::doReliableRegistration,
maxBackoff * 2);
}
void Slave::handleRunTaskMessage(
const UPID& from,
RunTaskMessage&& runTaskMessage)
{
runTask(
from,
runTaskMessage.framework(),
runTaskMessage.framework_id(),
runTaskMessage.pid(),
runTaskMessage.task(),
google::protobuf::convert(runTaskMessage.resource_version_uuids()),
runTaskMessage.has_launch_executor() ?
Option<bool>(runTaskMessage.launch_executor()) : None());
}
// TODO(vinod): Instead of crashing the slave on checkpoint errors,
// send TASK_LOST to the framework.
void Slave::runTask(
const UPID& from,
const FrameworkInfo& frameworkInfo,
const FrameworkID& frameworkId,
const UPID& pid,
const TaskInfo& task,
const vector<ResourceVersionUUID>& resourceVersionUuids,
const Option<bool>& launchExecutor)
{
CHECK_NE(task.has_executor(), task.has_command())
<< "Task " << task.task_id()
<< " should have either CommandInfo or ExecutorInfo set but not both";
if (master != from) {
LOG(WARNING) << "Ignoring run task message from " << from
<< " because it is not the expected master: "
<< (master.isSome() ? stringify(master.get()) : "None");
return;
}
if (!frameworkInfo.has_id()) {
LOG(ERROR) << "Ignoring run task message from " << from
<< " because it does not have a framework ID";
return;
}
const ExecutorInfo executorInfo = getExecutorInfo(frameworkInfo, task);
bool executorGeneratedForCommandTask = !task.has_executor();
run(frameworkInfo,
executorInfo,
task,
None(),
resourceVersionUuids,
pid,
launchExecutor,
executorGeneratedForCommandTask);
}
Option<Error> Slave::validateResourceLimitsAndIsolators(
const vector<TaskInfo>& tasks)
{
foreach (const TaskInfo& task, tasks) {
if (!(task.has_container() &&
task.container().type() == ContainerInfo::DOCKER)) {
if (task.limits().count("cpus") &&
!(strings::contains(flags.isolation, "cgroups/cpu") ||
strings::contains(flags.isolation, "cgroups/all"))) {
return Error(
"CPU limits can only be set on tasks launched in Mesos containers"
" when the agent has loaded the 'cgroups/cpu' isolator");
}
if (task.limits().count("mem") &&
!(strings::contains(flags.isolation, "cgroups/mem") ||
strings::contains(flags.isolation, "cgroups/all"))) {
return Error(
"Memory limits can only be set on tasks launched in Mesos"
" containers when the agent has loaded the 'cgroups/mem' isolator");
}
}
}
return None();
}
void Slave::run(
const FrameworkInfo& frameworkInfo,
ExecutorInfo executorInfo,
Option<TaskInfo> task,
Option<TaskGroupInfo> taskGroup,
const vector<ResourceVersionUUID>& resourceVersionUuids,
const UPID& pid,
const Option<bool>& launchExecutor,
bool executorGeneratedForCommandTask)
{
CHECK_NE(task.isSome(), taskGroup.isSome())
<< "Either task or task group should be set but not both";
auto injectAllocationInfo = [](
RepeatedPtrField<Resource>* resources,
const FrameworkInfo& frameworkInfo) {
set<string> roles = protobuf::framework::getRoles(frameworkInfo);
foreach (Resource& resource, *resources) {
if (!resource.has_allocation_info()) {
if (roles.size() != 1) {
LOG(FATAL) << "Missing 'Resource.AllocationInfo' for resources"
<< " allocated to MULTI_ROLE framework"
<< " '" << frameworkInfo.name() << "'";
}
resource.mutable_allocation_info()->set_role(*roles.begin());
}
}
};
injectAllocationInfo(executorInfo.mutable_resources(), frameworkInfo);
upgradeResources(&executorInfo);
if (task.isSome()) {
injectAllocationInfo(task->mutable_resources(), frameworkInfo);
if (task->has_executor()) {
injectAllocationInfo(
task->mutable_executor()->mutable_resources(),
frameworkInfo);
}
upgradeResources(&task.get());
}
if (taskGroup.isSome()) {
foreach (TaskInfo& task, *taskGroup->mutable_tasks()) {
injectAllocationInfo(task.mutable_resources(), frameworkInfo);
if (task.has_executor()) {
injectAllocationInfo(
task.mutable_executor()->mutable_resources(),
frameworkInfo);
}
}
upgradeResources(&taskGroup.get());
}
vector<TaskInfo> tasks;
if (task.isSome()) {
tasks.push_back(task.get());
} else {
foreach (const TaskInfo& task, taskGroup->tasks()) {
tasks.push_back(task);
}
}
const FrameworkID& frameworkId = frameworkInfo.id();
LOG(INFO) << "Got assigned " << taskOrTaskGroup(task, taskGroup)
<< " for framework " << frameworkId;
foreach (const TaskInfo& _task, tasks) {
if (_task.slave_id() != info.id()) {
LOG(WARNING)
<< "Agent " << info.id() << " ignoring running "
<< taskOrTaskGroup(_task, taskGroup) << " because "
<< "it was intended for old agent " << _task.slave_id();
return;
}
}
CHECK(state == RECOVERING || state == DISCONNECTED ||
state == RUNNING || state == TERMINATING)
<< state;
// TODO(bmahler): Also ignore if we're DISCONNECTED.
if (state == RECOVERING || state == TERMINATING) {
LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
<< " because the agent is " << state;
// We do not send `ExitedExecutorMessage` here because the disconnected
// agent is expected to (eventually) reregister and reconcile the executor
// states with the master.
// TODO(vinod): Consider sending a TASK_LOST here.
// Currently it is tricky because 'statusUpdate()'
// ignores updates for unknown frameworks.
return;
}
vector<Future<bool>> unschedules;
// If we are about to create a new framework, unschedule the work
// and meta directories from getting gc'ed.
Framework* framework = getFramework(frameworkId);
if (framework == nullptr) {
// Unschedule framework work directory.
string path = paths::getFrameworkPath(
flags.work_dir, info.id(), frameworkId);
if (os::exists(path)) {
unschedules.push_back(gc->unschedule(path));
}
// Unschedule framework meta directory.
path = paths::getFrameworkPath(metaDir, info.id(), frameworkId);
if (os::exists(path)) {
unschedules.push_back(gc->unschedule(path));
}
Option<UPID> frameworkPid = None();
if (pid != UPID()) {
frameworkPid = pid;
}
framework = new Framework(
this,
flags,
frameworkInfo,
frameworkPid);
frameworks[frameworkId] = framework;
if (frameworkInfo.checkpoint()) {
framework->checkpointFramework();
}
// Does this framework ID already exist in `completedFrameworks`?
// If so, move the completed executors of the old framework to
// this new framework and remove the old completed framework.
if (completedFrameworks.contains(frameworkId)) {
Owned<Framework>& completedFramework =
completedFrameworks.at(frameworkId);
framework->completedExecutors = completedFramework->completedExecutors;
completedFrameworks.erase(frameworkId);
}
}
CHECK_NOTNULL(framework);
Option<Error> error = validateResourceLimitsAndIsolators(tasks);
if (error.isSome()) {
// We report TASK_DROPPED to the framework because the task was
// never launched. For non-partition-aware frameworks, we report
// TASK_LOST for backward compatibility.
mesos::TaskState taskState = TASK_DROPPED;
if (!protobuf::frameworkHasCapability(
frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
taskState = TASK_LOST;
}
foreach (const TaskInfo& _task, tasks) {
const StatusUpdate update = protobuf::createStatusUpdate(
frameworkId,
info.id(),
_task.task_id(),
taskState,
TaskStatus::SOURCE_SLAVE,
id::UUID::random(),
error->message,
TaskStatus::REASON_GC_ERROR);
statusUpdate(update, UPID());
}
if (framework->idle()) {
removeFramework(framework);
}
return;
}
const ExecutorID& executorId = executorInfo.executor_id();
if (HookManager::hooksAvailable()) {
// Set task labels from run task label decorator.
for (auto it = tasks.begin(); it != tasks.end(); ++it) {
(*it).mutable_labels()->CopyFrom(
HookManager::slaveRunTaskLabelDecorator(
(*it), executorInfo, frameworkInfo, info));
}
// Update `task`/`taskGroup` to reflect the task label updates.
if (task.isSome()) {
CHECK_EQ(1u, tasks.size());
task->mutable_labels()->CopyFrom(tasks[0].labels());
} else {
for (int i = 0; i < taskGroup->tasks().size(); ++i) {
taskGroup->mutable_tasks(i)->mutable_labels()->
CopyFrom(tasks[i].labels());
}
}
}
// Track the pending task / task group to ensure the framework is
// not removed and the framework and top level executor directories
// are not scheduled for deletion before '_run()' is called.
//
// TODO(bmahler): Can we instead track pending tasks within the
// `Executor` struct by creating it earlier?
if (task.isSome()) {
framework->addPendingTask(executorId, task.get());
} else {
framework->addPendingTaskGroup(executorId, taskGroup.get());
}
// If we are about to create a new executor, unschedule the top
// level work and meta directories from getting gc'ed.
Executor* executor = framework->getExecutor(executorId);
if (executor == nullptr) {
// Unschedule executor work directory.
string path = paths::getExecutorPath(
flags.work_dir, info.id(), frameworkId, executorId);
if (os::exists(path)) {
unschedules.push_back(gc->unschedule(path));
}
// Unschedule executor meta directory.
path = paths::getExecutorPath(metaDir, info.id(), frameworkId, executorId);
if (os::exists(path)) {
unschedules.push_back(gc->unschedule(path));
}
}
auto onUnscheduleGCFailure =
[=](const Future<vector<bool>>& unschedules) -> Future<vector<bool>> {
LOG(ERROR) << "Failed to unschedule directories scheduled for gc: "
<< unschedules.failure();
Framework* _framework = getFramework(frameworkId);
if (_framework == nullptr) {
const string error =
"Cannot handle unschedule GC failure for " +
taskOrTaskGroup(task, taskGroup) + " because the framework " +
stringify(frameworkId) + " does not exist";
LOG(WARNING) << error;
return Failure(error);
}
// We report TASK_DROPPED to the framework because the task was
// never launched. For non-partition-aware frameworks, we report
// TASK_LOST for backward compatibility.
mesos::TaskState taskState = TASK_DROPPED;
if (!protobuf::frameworkHasCapability(
frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
taskState = TASK_LOST;
}
foreach (const TaskInfo& _task, tasks) {
_framework->removePendingTask(_task.task_id());
const StatusUpdate update = protobuf::createStatusUpdate(
frameworkId,
info.id(),
_task.task_id(),
taskState,
TaskStatus::SOURCE_SLAVE,
id::UUID::random(),
"Could not launch the task because we failed to unschedule"
" directories scheduled for gc",
TaskStatus::REASON_GC_ERROR);
// TODO(vinod): Ensure that the task status update manager
// reliably delivers this update. Currently, we don't guarantee
// this because removal of the framework causes the status
// update manager to stop retrying for its un-acked updates.
statusUpdate(update, UPID());
}
if (_framework->idle()) {
removeFramework(_framework);
}
return unschedules;
};
// `taskLaunch` encapsulates each task's launch steps from this point
// to the end of `_run` (the completion of task authorization).
Future<Nothing> taskLaunch = collect(unschedules)
// Handle the failure iff unschedule GC fails.
.repair(defer(self(), onUnscheduleGCFailure))
// If unschedule GC succeeds, trigger the next continuation.
.then(defer(
self(),
&Self::_run,
frameworkInfo,
executorInfo,
task,
taskGroup,
resourceVersionUuids,
launchExecutor));
// Use a sequence to ensure that task launch order is preserved.
framework->taskLaunchSequences[executorId]
.add<Nothing>([taskLaunch]() -> Future<Nothing> {
// We use this sequence only to maintain the task launching order. If the
// sequence is deleted, we do not want the resulting discard event to
// propagate up the chain, which would prevent the previous `.then()` or
// `.repair()` callbacks from being invoked. Thus, we use `undiscardable`
// to protect each `taskLaunch`.
return undiscardable(taskLaunch);
})
// We register `onAny` on the future returned by the sequence (referred to
// as `seqFuture` below). The following scenarios could happen:
//
// (1) `seqFuture` becomes ready. This happens when all previous tasks'
// `taskLaunch` futures are in non-pending state AND this task's own
// `taskLaunch` future is in ready state. The `onReady` call registered
// below will be triggered and continue the success path.
//
// (2) `seqFuture` becomes failed. This happens when all previous tasks'
// `taskLaunch` futures are in non-pending state AND this task's own
// `taskLaunch` future is in failed state (e.g. due to unschedule GC
// failure or some other failure). The `onFailed` call registered below
// will be triggered to handle the failure.
//
// (3) `seqFuture` becomes discarded. This happens when the sequence is
// destructed (see declaration of `taskLaunchSequences` on its lifecycle)
// while the `seqFuture` is still pending. In this case, we wait until
// this task's own `taskLaunch` future becomes non-pending and trigger
// callbacks accordingly.
//
// TODO(mzhu): In case (3), the destruction of the sequence means that the
// agent will eventually discover that the executor is absent and drop
// the task. While `__run` is capable of handling this case, it is more
// optimal to handle the failure earlier here rather than waiting for
// the `taskLaunch` transition and directing control to `__run`.
.onAny(defer(self(), [=](const Future<Nothing>&) {
// We only want to execute the following callbacks once the work performed
// in the `taskLaunch` chain is complete. Thus, we add them onto the
// `taskLaunch` chain rather than dispatching directly.
taskLaunch
.onReady(defer(
self(),
&Self::__run,
frameworkInfo,
executorInfo,
task,
taskGroup,
resourceVersionUuids,
launchExecutor,
executorGeneratedForCommandTask))
.onFailed(defer(self(), [=](const string& failure) {
Framework* _framework = getFramework(frameworkId);
if (_framework == nullptr) {
LOG(WARNING) << "Ignoring running "
<< taskOrTaskGroup(task, taskGroup)
<< " because the framework " << stringify(frameworkId)
<< " does not exist";
}
if (launchExecutor.isSome() && launchExecutor.get()) {
// Master expects a new executor to be launched for this task(s).
// To keep the master executor entries updated, the agent needs to
// send `ExitedExecutorMessage` even though no executor launched.
sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
// See the declaration of `taskLaunchSequences` regarding its
// lifecycle management.
if (_framework != nullptr) {
_framework->taskLaunchSequences.erase(executorInfo.executor_id());
}
}
}));
}));
// TODO(mzhu): Consolidate error handling code in `__run` here.
}
Future<Nothing> Slave::_run(
const FrameworkInfo& frameworkInfo,
const ExecutorInfo& executorInfo,
const Option<TaskInfo>& task,
const Option<TaskGroupInfo>& taskGroup,
const std::vector<ResourceVersionUUID>& resourceVersionUuids,
const Option<bool>& launchExecutor)
{
// TODO(anindya_sinha): Consider refactoring the initial steps common
// to `_run()` and `__run()`.
CHECK_NE(task.isSome(), taskGroup.isSome())
<< "Either task or task group should be set but not both";
vector<TaskInfo> tasks;
if (task.isSome()) {
tasks.push_back(task.get());
} else {
foreach (const TaskInfo& _task, taskGroup->tasks()) {
tasks.push_back(_task);
}
}
const FrameworkID& frameworkId = frameworkInfo.id();
Framework* framework = getFramework(frameworkId);
if (framework == nullptr) {
const string error =
"Ignoring running " + taskOrTaskGroup(task, taskGroup) +
" because the framework " + stringify(frameworkId) + " does not exist";
LOG(WARNING) << error;
return Failure(error);
}
// We don't send a status update here because a terminating
// framework cannot send acknowledgements.
if (framework->state == Framework::TERMINATING) {
const string error = "Ignoring running " +
taskOrTaskGroup(task, taskGroup) + " of framework " +
stringify(frameworkId) +
" because the framework is terminating";
LOG(WARNING) << error;
// Although we cannot send a status update in this case, we remove
// the affected tasks from the pending tasks.
foreach (const TaskInfo& _task, tasks) {
framework->removePendingTask(_task.task_id());
}
if (framework->idle()) {
removeFramework(framework);
}
return Failure(error);
}
// Ignore the launch if killed in the interim. The invariant here
// is that all tasks in the group are still pending, or all were
// removed due to a kill arriving for one of the tasks in the group.
bool allPending = true;
bool allRemoved = true;
foreach (const TaskInfo& _task, tasks) {
if (framework->isPending(_task.task_id())) {
allRemoved = false;
} else {
allPending = false;
}
}
CHECK(allPending != allRemoved)
<< "BUG: The " << taskOrTaskGroup(task, taskGroup)
<< " was partially killed";
if (allRemoved) {
const string error = "Ignoring running " +
taskOrTaskGroup(task, taskGroup) + " of framework " +
stringify(frameworkId) +
" because it has been killed in the meantime";
LOG(WARNING) << error;
return Failure(error);
}
// Authorize the task or tasks (as in a task group) to ensure that the
// task user is allowed to launch tasks on the agent. If authorization
// fails, the task (or all tasks in a task group) are not launched.
vector<Future<bool>> authorizations;
LOG(INFO) << "Authorizing " << taskOrTaskGroup(task, taskGroup)
<< " for framework " << frameworkId;
foreach (const TaskInfo& _task, tasks) {
authorizations.push_back(authorizeTask(_task, frameworkInfo));
}
auto onTaskAuthorizationFailure =
[=](const string& error, Framework* _framework) {
CHECK_NOTNULL(_framework);
// For failed authorization, we send a TASK_ERROR status update
// for all tasks.
const TaskStatus::Reason reason = task.isSome()
? TaskStatus::REASON_TASK_UNAUTHORIZED
: TaskStatus::REASON_TASK_GROUP_UNAUTHORIZED;
LOG(ERROR) << "Authorization failed for "
<< taskOrTaskGroup(task, taskGroup) << " of framework "
<< frameworkId << ": " << error;
foreach (const TaskInfo& _task, tasks) {
_framework->removePendingTask(_task.task_id());
const StatusUpdate update = protobuf::createStatusUpdate(
frameworkId,
info.id(),
_task.task_id(),
TASK_ERROR,
TaskStatus::SOURCE_SLAVE,
id::UUID::random(),
error,
reason);
statusUpdate(update, UPID());
}
if (_framework->idle()) {
removeFramework(_framework);
}
};
return collect(authorizations)
.repair(defer(self(),
[=](const Future<vector<bool>>& future) -> Future<vector<bool>>