blob: 87bba8d4edbee5d80fcba3e63466beff366a018c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
#include "slave/containerizer/fetcher.hpp"
#include <process/async.hpp>
#include <process/check.hpp>
#include <process/collect.hpp>
#include <process/dispatch.hpp>
#include <process/id.hpp>
#include <process/owned.hpp>
#include <process/subprocess.hpp>
#include <process/metrics/metrics.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/net.hpp>
#include <stout/path.hpp>
#include <stout/strings.hpp>
#include <stout/uri.hpp>
#ifdef __WINDOWS__
#include <stout/windows.hpp>
#endif // __WINDOWS__
#include <stout/os/exists.hpp>
#include <stout/os/find.hpp>
#include <stout/os/killtree.hpp>
#include <stout/os/realpath.hpp>
#include <stout/os/read.hpp>
#include <stout/os/rmdir.hpp>
#include "hdfs/hdfs.hpp"
#include "common/status_utils.hpp"
#include "slave/containerizer/fetcher_process.hpp"
using std::list;
using std::map;
using std::shared_ptr;
using std::string;
using std::transform;
using std::vector;
using strings::startsWith;
using mesos::fetcher::FetcherInfo;
using process::async;
using process::Failure;
using process::Future;
using process::Owned;
using process::Subprocess;
namespace mesos {
namespace internal {
namespace slave {
static const string CACHE_FILE_NAME_PREFIX = "c";
Fetcher::Fetcher(const Flags& flags) : process(new FetcherProcess(flags))
if (os::exists(flags.fetcher_cache_dir)) {
Try<Nothing> rmdir = os::rmdir(flags.fetcher_cache_dir, true);
<< "Could not delete fetcher cache directory '"
<< flags.fetcher_cache_dir << "': " + rmdir.error();
Fetcher::Fetcher(const process::Owned<FetcherProcess>& process)
: process(process)
Try<string> Fetcher::basename(const string& uri)
// TODO(bernd-mesos): full URI parsing, then move this to stout.
// There is a bug (or is it a feature?) in the original fetcher
// code without caching that remains in effect here. URIs are
// treated like file paths, looking for occurrences of "/",
// but ignoring other separators that can show up
// (e.g. "?", "=" in HTTP URLs).
if (uri.find_first_of('\\') != string::npos ||
uri.find_first_of('\'') != string::npos ||
uri.find_first_of('\0') != string::npos) {
return Error("Illegal characters in URI");
size_t index = uri.find("://");
if (index != string::npos && 1 < index) {
// URI starts with protocol specifier, e.g., http://, https://,
// ftp://, ftps://, hdfs://, hftp://, s3://, s3n://.
string path = uri.substr(index + 3);
if (!strings::contains(path, "/") || path.size() <= path.find('/') + 1) {
return Error("Malformed URI (missing path): " + uri);
return path.substr(path.find_last_of('/') + 1);
return Path(uri).basename();
Try<Nothing> Fetcher::validateUri(const string& uri)
Try<string> result = basename(uri);
if (result.isError()) {
return Error(result.error());
return Nothing();
Try<Nothing> Fetcher::validateOutputFile(const string& path)
Try<string> result = Path(path).basename();
if (result.isError()) {
return Error(result.error());
if (path.size() == 0) {
return Error("URI output file path is empty");
// TODO(mrbrowning): Check that the filename's directory component is
// actually a subdirectory of the sandbox, not just relative to it.
if (strings::startsWith(path, '/')) {
return Error("URI output file must be within the sandbox directory");
return Nothing();
static Try<Nothing> validateUris(const CommandInfo& commandInfo)
foreach (const CommandInfo::URI& uri, commandInfo.uris()) {
Try<Nothing> uriValidation = Fetcher::validateUri(uri.value());
if (uriValidation.isError()) {
return Error(uriValidation.error());
if (uri.has_output_file()) {
Try<Nothing> outputFileValidation =
if (outputFileValidation.isError()) {
return Error(outputFileValidation.error());
return Nothing();
Result<string> Fetcher::uriToLocalPath(
const string& uri,
const Option<string>& frameworksHome)
const bool fileUri = strings::startsWith(uri, uri::FILE_PREFIX);
if (!fileUri && strings::contains(uri, "://")) {
return None();
// TODO(andschwa): Fix `path::from_uri` to remove hostname component, which it
// currently does not do, so we remove `localhost` manually here.
string path =
strings::remove(path::from_uri(uri), "localhost", strings::PREFIX);
if (!path::is_absolute(path)) {
if (fileUri) {
return Error("File URI only supports absolute paths");
if (frameworksHome.isNone() || frameworksHome->empty()) {
return Error("A relative path was passed for the resource but the "
"Mesos framework home was not specified. "
"Please either provide this config option "
"or avoid using a relative path");
} else {
path = path::join(frameworksHome.get(), path);
LOG(INFO) << "Prepended Mesos frameworks home to relative path, "
<< "making it: '" << path << "'";
return path;
bool Fetcher::isNetUri(const string& uri)
return strings::startsWith(uri, "http://") ||
strings::startsWith(uri, "https://") ||
strings::startsWith(uri, "ftp://") ||
strings::startsWith(uri, "ftps://");
Future<Nothing> Fetcher::fetch(
const ContainerID& containerId,
const CommandInfo& commandInfo,
const string& sandboxDirectory,
const Option<string>& user)
return dispatch(process.get(),
void Fetcher::kill(const ContainerID& containerId)
dispatch(process.get(), &FetcherProcess::kill, containerId);
FetcherProcess::Metrics::Metrics(FetcherProcess *fetcher)
: task_fetches_succeeded("containerizer/fetcher/task_fetches_succeeded"),
[=]() {
// This value is safe to read while it is concurrently updated.
return static_cast<double>(fetcher->cache.totalSpace().bytes());
[=]() {
// This value is safe to read while it is concurrently updated.
return static_cast<double>(fetcher->cache.usedSpace().bytes());
// Wait for the metrics to be removed before we allow the destructor
// to complete.
FetcherProcess::FetcherProcess(const Flags& _flags)
: ProcessBase(process::ID::generate("fetcher")),
foreachkey (const ContainerID& containerId, subprocessPids) {
// Find out how large a potential download from the given URI is.
static Try<Bytes> fetchSize(
const string& uri,
const Option<string>& frameworksHome)
VLOG(1) << "Fetching size for URI: " << uri;
Result<string> path = Fetcher::uriToLocalPath(uri, frameworksHome);
if (path.isError()) {
return Error(path.error());
if (path.isSome()) {
Try<Bytes> size = os::stat::size(
path.get(), os::stat::FollowSymlink::FOLLOW_SYMLINK);
if (size.isError()) {
return Error("Could not determine file size for: '" + path.get() +
"', error: " + size.error());
return size.get();
if (Fetcher::isNetUri(uri)) {
Try<Bytes> size = net::contentLength(uri);
if (size.isError()) {
return Error(size.error());
if (size.get() == 0) {
return Error("URI reported content-length 0: " + uri);
return size.get();
// TODO(hausdorff): (MESOS-5460) Explore adding support for fetching from
// HDFS.
#ifndef __WINDOWS__
Try<Owned<HDFS>> hdfs = HDFS::create();
if (hdfs.isError()) {
return Error("Failed to create HDFS client: " + hdfs.error());
Future<Bytes> size = hdfs.get()->du(uri);
if (!size.isReady()) {
return Error("Hadoop client could not determine size: " +
(size.isFailed() ? size.failure() : "discarded"));
return size.get();
return Error("Windows currently does not support fetching files from HDFS");
#endif // __WINDOWS__
Future<Nothing> FetcherProcess::fetch(
const ContainerID& containerId,
const CommandInfo& commandInfo,
const string& sandboxDirectory,
const Option<string>& user)
VLOG(1) << "Starting to fetch URIs for container: " << containerId
<< ", directory: " << sandboxDirectory;
Try<Nothing> validated = validateUris(commandInfo);
if (validated.isError()) {
return Failure("Could not fetch: " + validated.error());
Option<string> commandUser = user;
if (commandInfo.has_user()) {
commandUser = commandInfo.user();
string cacheDirectory = flags.fetcher_cache_dir;
if (commandUser.isSome()) {
// Segregating per-user cache directories.
cacheDirectory = path::join(cacheDirectory, commandUser.get());
// For each URI we determine if we should use the cache and if so we
// try and either get the cache entry or create a cache entry. If
// we're getting the cache entry then we might need to wait for that
// cache entry to be downloaded. If we're creating a new cache entry
// then we need to properly reserve the cache space (and perform any
// evictions). Thus, there are three possibilities for each URI:
// (1) We are not using the cache.
// (2) We are using the cache but need to wait for an entry to be
// downloaded.
// (3) We are using the cache and need to create a new entry.
// We capture whether or not we're using the cache using an Option
// as a value in a map, i.e., if we are not trying to use the cache
// as in (1) above then the Option is None otherwise as in (2) and
// (3) the Option is Some. And to capture the asynchronous nature of
// both (2) and (3) that Option holds a Future to the actual cache
// entry.
hashmap<CommandInfo::URI, Option<Future<shared_ptr<Cache::Entry>>>> entries;
// When we create new entries, we need to track whether we already have
// a entry for the corresponding URI value. This handles the case where
// multiple entries have the same URI value (but hash differently because
// they differ in other fields). If we see the same URI value multiple
// times, then we simply add references the initial entry.
hashmap<string, shared_ptr<Cache::Entry>> newEntries;
foreach (const CommandInfo::URI& uri, commandInfo.uris()) {
if (!uri.cache()) {
entries[uri] = None();
if (newEntries.contains(uri.value())) {
entries[uri] =;
// Check if this is already in the cache (but not necessarily
// downloaded).
const Option<shared_ptr<Cache::Entry>> entry =
cache.get(commandUser, uri.value());
if (entry.isSome()) {
// Wait for the URI to be downloaded into the cache (or fail)
entries[uri] = entry.get()->completion()
.then(defer(self(), [=]() {
return Future<shared_ptr<Cache::Entry>>(entry.get());
} else {
shared_ptr<Cache::Entry> newEntry =
cache.create(cacheDirectory, commandUser, uri);
newEntries.put(uri.value(), newEntry);
entries[uri] =
async([=]() {
return fetchSize(uri.value(), flags.frameworks_home);
.then(defer(self(), [=](const Try<Bytes>& requestedSpace) {
return reserveCacheSpace(requestedSpace, newEntry);
// NOTE: We explicitly call the continuation '_fetch' even though it
// looks like we could easily inline it here because we want to be
// able to mock the function for testing! Don't remove this!
return _fetch(entries,
Future<Nothing> FetcherProcess::_fetch(
const hashmap<CommandInfo::URI, Option<Future<shared_ptr<Cache::Entry>>>>&
const ContainerID& containerId,
const string& sandboxDirectory,
const string& cacheDirectory,
const Option<string>& user)
// Get out all of the futures we need to wait for so we can wait on
// them together via 'await'.
vector<Future<shared_ptr<Cache::Entry>>> futures;
foreachvalue (const Option<Future<shared_ptr<Cache::Entry>>>& entry,
entries) {
if (entry.isSome()) {
return await(futures)
.then(defer(self(), [=]() {
// For each URI, if there is a potential cache entry and waiting
// for its associated future was successful, extract the entry
// from the future and store it in 'result'. Otherwise we assume
// we are not using or cannot use the cache for this URI.
hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>> result;
foreachpair (const CommandInfo::URI& uri,
const Option<Future<shared_ptr<Cache::Entry>>>& entry,
entries) {
if (entry.isSome()) {
if (entry->isReady()) {
result[uri] = entry->get();
} else {
<< "Reverting to fetching directly into the sandbox for '"
<< uri.value()
<< "', due to failure to fetch through the cache, "
<< "with error: " << entry->failure();
result[uri] = None();
} else {
// No entry means bypassing the cache.
result[uri] = None();
// NOTE: While we could inline '__fetch' we've explicitly kept
// it as a separate function to minimize complexity. Like with
// '_fetch', this also enables this phase of the fetcher cache
// to easily be mocked for testing!
return __fetch(result,
Future<Nothing> FetcherProcess::__fetch(
const hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>>& entries,
const ContainerID& containerId,
const string& sandboxDirectory,
const string& cacheDirectory,
const Option<string>& user)
// Now construct the FetcherInfo based on which URIs we're using
// the cache for and which ones we are bypassing the cache.
FetcherInfo info;
foreachpair (const CommandInfo::URI& uri,
const Option<shared_ptr<Cache::Entry>>& entry,
entries) {
FetcherInfo::Item* item = info.add_items();
if (entry.isSome()) {
if (entry.get()->completion().isPending()) {
// Since the entry is not yet "complete", i.e.,
// 'completion().isPending()', it must be the case that we created
// the entry in FetcherProcess::fetch(). Otherwise the entry should
// have been in the cache already and we would have waited for its
// completion in FetcherProcess::fetch().
} else {
} else {
if (user.isSome()) {
if (!flags.frameworks_home.empty()) {
return run(containerId, sandboxDirectory, user, info)
.repair(defer(self(), [=](const Future<Nothing>& future) {
LOG(ERROR) << "Failed to run mesos-fetcher: " << future.failure();
foreachvalue (const Option<shared_ptr<Cache::Entry>>& entry, entries) {
if (entry.isSome()) {
if (entry.get()->completion().isPending()) {
// Unsuccessfully (or partially) downloaded! Remove from the cache.
cache.remove(entry.get()); // Might delete partial download.
return future; // Always propagate the failure!
// Call to `operator` here forces the conversion on MSVC. This is implicit
// on clang and gcc.
.operator std::function<process::Future<Nothing>(
const process::Future<Nothing> &)>())
.then(defer(self(), [=]() {
foreachvalue (const Option<shared_ptr<Cache::Entry>>& entry, entries) {
if (entry.isSome()) {
if (entry.get()->completion().isPending()) {
// Successfully downloaded and cached!
Try<Nothing> adjust = cache.adjust(entry.get());
if (adjust.isSome()) {
} else {
LOG(WARNING) << "Failed to adjust the cache size for entry '"
<< entry.get()->key << "' with error: "
<< adjust.error();
// Successfully fetched, but not reusable from the
// cache, because we are deleting the entry now.
return Nothing();
static off_t delta(
const Bytes& actualSize,
const shared_ptr<FetcherProcess::Cache::Entry>& entry)
if (actualSize < entry->size) {
Bytes delta = entry->size - actualSize;
LOG(WARNING) << "URI download result for '" << entry->key
<< "' is smaller than expected by " << stringify(delta)
<< " at: " << entry->path();
return -off_t(delta.bytes());
} else if (actualSize > entry->size) {
Bytes delta = actualSize - entry->size;
LOG(WARNING) << "URI download result for '" << entry->key
<< "' is larger than expected by " << stringify(delta)
<< " at: " << entry->path();
return off_t(delta.bytes());
return 0;
// For testing only.
Try<list<Path>> FetcherProcess::cacheFiles() const
list<Path> result;
if (!os::exists(flags.fetcher_cache_dir)) {
return result;
const Try<list<string>> find =
os::find(flags.fetcher_cache_dir, CACHE_FILE_NAME_PREFIX);
if (find.isError()) {
return Error("Could not access cache directory '" +
flags.fetcher_cache_dir + "' with error: " + find.error());
[](const string& path) { return Path(path); });
return result;
// For testing only.
size_t FetcherProcess::cacheSize() const
return cache.size();
Bytes FetcherProcess::availableCacheSpace() const
return cache.availableSpace();
const Try<Bytes>& requestedSpace,
const shared_ptr<FetcherProcess::Cache::Entry>& entry)
if (requestedSpace.isError()) {
// Let anyone waiting on this future know that we've
// failed to download and they should bypass the cache
// (any new requests will try again).
return Failure("Could not determine size of cache file for '" +
entry->key + "' with error: " +
Try<Nothing> reservation = cache.reserve(requestedSpace.get());
if (reservation.isError()) {
// Let anyone waiting on this future know that we've
// failed to download and they should bypass the cache
// (any new requests will try again).
return Failure("Failed to reserve space in the cache: " +
VLOG(1) << "Claiming fetcher cache space for: " << entry->key;
// NOTE: We must set the entry size only when we are also claiming
// the space! Other functions rely on this dependency (see
// Cache::remove()).
entry->size = requestedSpace.get();
return entry;
Future<Nothing> FetcherProcess::run(
const ContainerID& containerId,
const string& sandboxDirectory,
const Option<string>& user,
const FetcherInfo& info)
// Before we fetch let's make sure we create 'stdout' and 'stderr'
// files into which we can redirect the output of the mesos-fetcher
// (and later redirect the child's stdout/stderr).
// TODO(tillt): Considering updating fetcher::run to take paths
// instead of file descriptors and then use Subprocess::PATH()
// instead of Subprocess::FD(). The reason this can't easily be done
// today is because we not only need to open the files but also
// chown them.
const string stdoutPath = path::join(info.sandbox_directory(), "stdout");
Try<int_fd> out = os::open(
if (out.isError()) {
return Failure("Failed to create 'stdout' file: " + out.error());
string stderrPath = path::join(info.sandbox_directory(), "stderr");
Try<int_fd> err = os::open(
if (err.isError()) {
return Failure("Failed to create 'stderr' file: " + err.error());
// NOTE: `os::chown` is not supported on Windows. The flag that gets passed in
// here is conditionally compiled out on Windows.
#ifndef __WINDOWS__
if (user.isSome()) {
// TODO(megha.sharma): Fetcher should not create separate stdout/stderr
// files but rather use FDs prepared by the container logger.
// See MESOS-6271 for more details.
Try<Nothing> chownOut = os::chown(
if (chownOut.isError()) {
return Failure(
"Failed to chown '" +
stdoutPath +
"' to user '" + user.get() + "' : " +
Try<Nothing> chownErr = os::chown(
if (chownErr.isError()) {
return Failure(
"Failed to chown '" +
stderrPath +
"' to user '" + user.get() + "' : " +
#endif // __WINDOWS__
// Return early if there are no URIs to fetch.
if (info.items_size() == 0) {
return Nothing();
#ifdef __WINDOWS__
string fetcherPath = path::join(flags.launcher_dir, "mesos-fetcher.exe");
string fetcherPath = path::join(flags.launcher_dir, "mesos-fetcher");
#endif // __WINDOWS__
Result<string> realpath = os::realpath(fetcherPath);
if (!realpath.isSome()) {
LOG(ERROR) << "Failed to determine the canonical path "
<< "for the mesos-fetcher '"
<< fetcherPath
<< "': "
<< (realpath.isError() ? realpath.error()
: "No such file or directory");
return Failure("Could not fetch URIs: failed to find mesos-fetcher");
// Now the actual mesos-fetcher command.
string command = realpath.get();
// We pass arguments to the fetcher program by means of an
// environment variable.
// For assuring that we pass on variables that may be consumed by
// the mesos-fetcher, we whitelist them before masking out any
// unwanted agent->fetcher environment spillover.
// TODO(tillt): Consider using the `mesos::internal::logging::Flags`
// to determine the whitelist.
const hashset<string> whitelist = {
map<string, string> environment;
foreachpair (const string& key, const string& value, os::environment()) {
if (whitelist.contains(strings::upper(key)) ||
(!startsWith(key, "LIBPROCESS_") && !startsWith(key, "MESOS_"))) {
environment.emplace(key, value);
environment["MESOS_FETCHER_INFO"] = stringify(JSON::protobuf(info));
if (flags.hadoop_home.isSome()) {
environment["HADOOP_HOME"] = flags.hadoop_home.get();
// TODO(jieyu): This is to make sure the libprocess of the fetcher
// can properly initialize and find the IP. Since we don't need to
// use the TCP socket for communication, it's OK to use a local
// address. Consider disable TCP socket in libprocess if libprocess
// supports that.
environment.emplace("LIBPROCESS_IP", "");
VLOG(1) << "Fetching URIs using command '" << command << "'";
Try<Subprocess> fetcherSubprocess = subprocess(
Subprocess::FD(out.get(), Subprocess::IO::OWNED),
Subprocess::FD(err.get(), Subprocess::IO::OWNED),
if (fetcherSubprocess.isError()) {
return Failure("Failed to execute mesos-fetcher: " +
// Remember this PID in case we need to kill the subprocess. See
// FetcherProcess::kill(). This value gets removed after we wait on
// the subprocess.
subprocessPids[containerId] = fetcherSubprocess->pid();
return fetcherSubprocess->status()
.then(defer(self(), [=](const Option<int>& status) -> Future<Nothing> {
if (status.isNone()) {
return Failure("No status available from mesos-fetcher");
if (!WSUCCEEDED(status.get())) {
return Failure("Failed to fetch all URIs for container '" +
stringify(containerId) + "': " +
return Nothing();
.onFailed(defer(self(), [=](const string&) {
// To aid debugging what went wrong when attempting to fetch, grab the
// fetcher's local log output from the sandbox and log it here.
Try<string> text = os::read(stderrPath);
if (text.isSome()) {
LOG(WARNING) << "Begin fetcher log (stderr in sandbox) for container "
<< containerId << " from running command: " << command
<< "\n" << text.get() << "\n"
<< "End fetcher log for container " << containerId;
} else {
LOG(ERROR) << "Fetcher log (stderr in sandbox) for container "
<< containerId << " not readable: " << text.error();
.onAny(defer(self(), [=](const Future<Nothing>&) {
// Clear the subprocess PID remembered from running mesos-fetcher.
void FetcherProcess::kill(const ContainerID& containerId)
if (subprocessPids.contains(containerId)) {
VLOG(1) << "Killing the fetcher for container '" << containerId << "'";
// Best effort kill the entire fetcher tree.
os::killtree(, SIGKILL);
string FetcherProcess::Cache::nextFilename(const CommandInfo::URI& uri)
// Different URIs may have the same base name, so we need to
// segregate the download results. This can be done by separate
// directories or by different file names. We opt for the latter
// since there may be tighter limits on how many sub-directories a
// file system can bear than on how many files can be in a directory.
// We put a fixed prefix upfront before the serial number so we can
// later easily find cache files with os::find() to support testing.
// Why we keep the file extension here: When fetching from cache, if
// extraction is enabled, the extraction algorithm can look at the
// extension of the cache file the same way as it would at a
// download of the original URI, and external commands performing
// the extraction do not get confused by their source file
// missing an expected form of extension. This is included in the
// following.
// Just for human operators who want to take a look at the cache
// and relate cache files to URIs, we also add some of the URI's
// basename, but not too much so we do not exceed file name size
// limits.
Try<string> base = Fetcher::basename(uri.value());
string s = base.get();
if (s.size() > 20) {
// Grab only a prefix and a suffix, but for sure including the
// file extension.
s = s.substr(0, 10) + "_" + s.substr(s.size() - 10, string::npos);
return CACHE_FILE_NAME_PREFIX + stringify(filenameSerial) + "-" + s;
static string cacheKey(const Option<string>& user, const string& uri)
return user.isNone() ? uri : user.get() + "@" + uri;
shared_ptr<FetcherProcess::Cache::Entry> FetcherProcess::Cache::create(
const string& cacheDirectory,
const Option<string>& user,
const CommandInfo::URI& uri)
const string key = cacheKey(user, uri.value());
const string filename = nextFilename(uri);
auto entry = shared_ptr<Cache::Entry>(
new Cache::Entry(key, cacheDirectory, filename));
table.put(key, entry);
VLOG(1) << "Created cache entry '" << key << "' with file: " << filename;
return entry;
const Option<string>& user,
const string& uri)
const string key = cacheKey(user, uri);
Option<shared_ptr<Entry>> entry = table.get(key);
if (entry.isSome()) {
// The FetcherProcess will always remove a failed download
// synchronously after marking this future as failed.
// Validate the cache file, if it has been downloaded.
if (entry.get()->completion().isReady()) {
Try<Nothing> validation = validate(entry.get());
if (validation.isError()) {
LOG(WARNING) << "Validation failed: '" + validation.error() +
"'. Removing cache entry...";
return None();
// Refresh the cache entry by moving it to the back of lruSortedEntries.
return entry;
bool FetcherProcess::Cache::contains(
const Option<string>& user,
const string& uri) const
const string key = cacheKey(user, uri);
return table.contains(key);
bool FetcherProcess::Cache::contains(
const shared_ptr<Cache::Entry>& entry) const
Option<shared_ptr<Cache::Entry>> found = table.get(entry->key);
if (found.isNone()) {
return false;
return found == entry;
// We are removing an entry if:
// (1) We failed to determine its prospective cache file size.
// (2) We failed to download it when invoking the mesos-fetcher.
// (3) We're evicting it to make room for another entry.
// (4) We failed to validate the cache file.
// In (1) and (2) the contract is that we'll have failed the entry's
// future before we call remove, so the entry's future should no
// longer be pending.
// In (3) it should be the case that the future is no longer pending,
// because we shouldn't be able to evict something if we're
// currently downloading it, because it should have a non-zero
// reference count and therefore the future must either be ready or
// failed in which case this is just case (1) above.
// In (4) we explicitly only validate a cache file if the future is
// ready (i.e., the file has been downloaded).
// NOTE: It is not necessarily the case that this cache entry has
// zero references because there might be some waiters on the
// downloading of this entry which haven't been able to run and find
// out that the downloading failed.
// We want to attempt to delete the file regardless of if it being
// downloaded since it might have been downloaded partially! Deleting
// this file should not be racing with any other downloading or
// deleting because all calls into the cache are serialized by the
// FetcherProcess and since this entry is already in the cache there
// should not be any other conflicting entries or files representing
// this entry. Furthermore every cache file has a unique name. Thus
// no new download conflicts with the manipulation of any pre-existing
// cache content.
Try<Nothing> FetcherProcess::Cache::remove(
const shared_ptr<Cache::Entry>& entry)
VLOG(1) << "Removing cache entry '" << entry->key
<< "' with filename: " << entry->filename;
// We may or may not have started downloading. The download may or may
// not have been partial. In any case, clean up whatever is there.
if (os::exists(entry->path().string())) {
Try<Nothing> rm = os::rm(entry->path().string());
if (rm.isError()) {
return Error("Could not delete fetcher cache file '" +
entry->path().string() + "' with error: " + rm.error() +
" for entry '" + entry->key +
"', leaking cache space: " + stringify(entry->size));
// NOTE: There is an assumption that if and only if 'entry->size > 0'
// then we've claimed cache space for this entry! This currently only
// gets set in reserveCacheSpace().
if (entry->size > 0) {
entry->size = 0;
return Nothing();
// Select LRU cache entries for cache eviction.
FetcherProcess::Cache::selectVictims(const Bytes& requiredSpace)
list<shared_ptr<FetcherProcess::Cache::Entry>> victims;
Bytes space = 0;
foreach (const shared_ptr<Cache::Entry>& entry, lruSortedEntries) {
if (!entry->isReferenced()) {
space += entry->size;
if (space >= requiredSpace) {
return victims;
return Error("Could not find enough cache files to evict");
Try<Nothing> FetcherProcess::Cache::reserve(
const Bytes& requestedSpace)
if (availableSpace() < requestedSpace) {
Bytes missingSpace = requestedSpace - availableSpace();
VLOG(1) << "Freeing up fetcher cache space for: " << missingSpace;
const Try<list<shared_ptr<Cache::Entry>>> victims =
if (victims.isError()) {
return Error("Could not free up enough fetcher cache space");
foreach (const shared_ptr<Cache::Entry>& entry, victims.get()) {
Try<Nothing> removal = remove(entry);
if (removal.isError()) {
return Error(removal.error());
return Nothing();
Try<Nothing> FetcherProcess::Cache::validate(
const std::shared_ptr<Cache::Entry>& entry)
VLOG(1) << "Validating cache entry '" << entry->key
<< "' with filename: " << entry->filename;
if (!os::exists(entry->path().string())) {
return Error("Cache file does not exist: " + entry->filename);
// TODO(abudnik): Consider adding validation of the cache file by either:
// 1. Comparing a known file checksum with the actual checksum of the file
// stored on disk.
// 2. Reading the whole file by chunks. Many filesystems detect data
// corruptions when reading file's data. As a positive side effect,
// the file's data will be loaded into the page cache.
return Nothing();
Try<Nothing> FetcherProcess::Cache::adjust(
const shared_ptr<FetcherProcess::Cache::Entry>& entry)
Try<Bytes> size = os::stat::size(
if (size.isSome()) {
off_t d = delta(size.get(), entry);
if (d <= 0) {
entry->size = size.get();
} else {
return Error("More cache size now necessary, not adjusting " +
} else {
// This should never be caused by Mesos itself, but cannot be excluded.
return Error("Fetcher cache file for '" + entry->key +
"' disappeared from: " + entry->path().string());
return Nothing();
size_t FetcherProcess::Cache::size() const
return table.size();
void FetcherProcess::Cache::claimSpace(const Bytes& bytes)
tally += bytes;
if (tally > space) {
// Used cache volume space exceeds the maximum amount set by
// flags.fetcher_cache_size. This may be tolerated temporarily,
// if there is sufficient physical space available. But it can
// otherwise cause unspecified system behavior at any moment.
LOG(WARNING) << "Fetcher cache space overflow - space used: " << tally
<< ", exceeds total fetcher cache space: " << space;
VLOG(1) << "Claimed cache space: " << bytes << ", now using: " << tally;
void FetcherProcess::Cache::releaseSpace(const Bytes& bytes)
CHECK(bytes <= tally) << "Attempt to release more cache space than in use - "
<< " requested: " << bytes << ", in use: " << tally;
tally -= bytes;
VLOG(1) << "Released cache space: " << bytes << ", now using: " << tally;
Bytes FetcherProcess::Cache::totalSpace() const
return space;
Bytes FetcherProcess::Cache::usedSpace() const
return tally;
Bytes FetcherProcess::Cache::availableSpace() const
if (tally > space) {
LOG(WARNING) << "Fetcher cache space overflow - space used: " << tally
<< ", exceeds total fetcher cache space: " << space;
return 0;
return space - tally;
void FetcherProcess::Cache::Entry::complete()
Future<Nothing> FetcherProcess::Cache::Entry::completion()
return promise.future();
void FetcherProcess::Cache::Entry::fail()
CHECK_PENDING(promise.future());"Could not download to fetcher cache: " + key);
void FetcherProcess::Cache::Entry::reference()
void FetcherProcess::Cache::Entry::unreference()
CHECK(referenceCount > 0);
bool FetcherProcess::Cache::Entry::isReferenced() const
return referenceCount > 0;
} // namespace slave {
} // namespace internal {
} // namespace mesos {