// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
#include <string>
#include <process/owned.hpp>
#include <stout/json.hpp>
#include <stout/net.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/protobuf.hpp>
#include <stout/strings.hpp>
#include <mesos/mesos.hpp>
#include <mesos/fetcher/fetcher.hpp>
#include "hdfs/hdfs.hpp"
#include "logging/flags.hpp"
#include "logging/logging.hpp"
#include "slave/slave.hpp"
#include "slave/containerizer/fetcher.hpp"
using namespace process;
using namespace mesos;
using namespace mesos::internal;
using std::string;
using mesos::fetcher::FetcherInfo;
using mesos::internal::slave::Fetcher;
// Try to extract sourcePath into directory. If sourcePath is
// recognized as an archive it will be extracted and true returned;
// if not recognized then false will be returned. An Error is
// returned if the extraction command fails.
static Try<bool> extract(
const string& sourcePath,
const string& destinationDirectory)
string command;
// Extract any .tar, .tgz, tar.gz, tar.bz2 or zip files.
if (strings::endsWith(sourcePath, ".tar") ||
strings::endsWith(sourcePath, ".tgz") ||
strings::endsWith(sourcePath, ".tar.gz") ||
strings::endsWith(sourcePath, ".tbz2") ||
strings::endsWith(sourcePath, ".tar.bz2") ||
strings::endsWith(sourcePath, ".txz") ||
strings::endsWith(sourcePath, ".tar.xz")) {
command = "tar -C '" + destinationDirectory + "' -xf";
} else if (strings::endsWith(sourcePath, ".gz")) {
string pathWithoutExtension = sourcePath.substr(0, sourcePath.length() - 3);
string filename = Path(pathWithoutExtension).basename();
command = "gzip -dc > '" + destinationDirectory + "/" + filename + "' <";
} else if (strings::endsWith(sourcePath, ".zip")) {
command = "unzip -d '" + destinationDirectory + "'";
} else {
return false;
command += " '" + sourcePath + "'";
LOG(INFO) << "Extracting with command: " << command;
int status = os::system(command);
if (status != 0) {
return Error("Failed to extract: command " + command +
" exited with status: " + stringify(status));
LOG(INFO) << "Extracted '" << sourcePath << "' into '"
<< destinationDirectory << "'";
return true;
// Attempt to get the uri using the hadoop client.
static Try<string> downloadWithHadoopClient(
const string& sourceUri,
const string& destinationPath)
Try<Owned<HDFS>> hdfs = HDFS::create();
if (hdfs.isError()) {
return Error("Failed to create HDFS client: " + hdfs.error());
LOG(INFO) << "Downloading resource with Hadoop client from '" << sourceUri
<< "' to '" << destinationPath << "'";
Future<Nothing> result = hdfs.get()->copyToLocal(sourceUri, destinationPath);
if (!result.isReady()) {
return Error("HDFS copyToLocal failed: " +
(result.isFailed() ? result.failure() : "discarded"));
return destinationPath;
static Try<string> downloadWithNet(
const string& sourceUri,
const string& destinationPath)
// The net::download function only supports these protocols.
CHECK(strings::startsWith(sourceUri, "http://") ||
strings::startsWith(sourceUri, "https://") ||
strings::startsWith(sourceUri, "ftp://") ||
strings::startsWith(sourceUri, "ftps://"));
LOG(INFO) << "Downloading resource from '" << sourceUri
<< "' to '" << destinationPath << "'";
Try<int> code = net::download(sourceUri, destinationPath);
if (code.isError()) {
return Error("Error downloading resource: " + code.error());
} else {
// The status code for successful HTTP requests is 200, the status code
// for successful FTP file transfers is 226.
if (strings::startsWith(sourceUri, "ftp://") ||
strings::startsWith(sourceUri, "ftps://")) {
if (code.get() != 226) {
return Error("Error downloading resource, received FTP return code " +
} else {
if (code.get() != 200) {
return Error("Error downloading resource, received HTTP return code " +
return destinationPath;
static Try<string> copyFile(
const string& sourcePath,
const string& destinationPath)
const string command = "cp '" + sourcePath + "' '" + destinationPath + "'";
LOG(INFO) << "Copying resource with command:" << command;
int status = os::system(command);
if (status != 0) {
return Error("Failed to copy with command '" + command +
"', exit status: " + stringify(status));
return destinationPath;
static Try<string> download(
const string& _sourceUri,
const string& destinationPath,
const Option<string>& frameworksHome)
// Trim leading whitespace for 'sourceUri'.
const string sourceUri = strings::trim(_sourceUri, strings::PREFIX);
LOG(INFO) << "Fetching URI '" << sourceUri << "'";
Try<Nothing> validation = Fetcher::validateUri(sourceUri);
if (validation.isError()) {
return Error(validation.error());
// 1. Try to fetch using a local copy.
// We regard as local: "file://" or the absense of any URI scheme.
Result<string> sourcePath =
Fetcher::uriToLocalPath(sourceUri, frameworksHome);
if (sourcePath.isError()) {
return Error(sourcePath.error());
} else if (sourcePath.isSome()) {
return copyFile(sourcePath.get(), destinationPath);
// 2. Try to fetch URI using os::net / libcurl implementation.
// We consider http, https, ftp, ftps compatible with libcurl.
if (Fetcher::isNetUri(sourceUri)) {
return downloadWithNet(sourceUri, destinationPath);
// 3. Try to fetch the URI using hadoop client.
// We use the hadoop client to fetch any URIs that are not
// handled by other fetchers(local / os::net). These URIs may be
// `hdfs://` URIs or any other URI that has been configured (and
// hence handled) in the hadoop client. This allows mesos to
// externalize the handling of previously unknown resource
// endpoints without the need to support them natively.
// Note: Hadoop Client is not a hard dependency for running mesos.
// This allows users to get mesos up and running without a
// hadoop_home or the hadoop client setup but in case we reach
// this part and don't have one configured, the fetch would fail
// and log an appropriate error.
return downloadWithHadoopClient(sourceUri, destinationPath);
// TODO(bernd-mesos): Refactor this into stout so that we can more easily
// chmod an executable. For example, we could define some static flags
// so that someone can do: os::chmod(path, EXECUTABLE_CHMOD_FLAGS).
static Try<string> chmodExecutable(const string& filePath)
Try<Nothing> chmod = os::chmod(
if (chmod.isError()) {
return Error("Failed to chmod executable '" +
filePath + "': " + chmod.error());
return filePath;
// Returns the resulting file or in case of extraction the destination
// directory (for logging).
static Try<string> fetchBypassingCache(
const CommandInfo::URI& uri,
const string& sandboxDirectory,
const Option<string>& frameworksHome)
LOG(INFO) << "Fetching directly into the sandbox directory";
Try<string> basename = Fetcher::basename(uri.value());
if (basename.isError()) {
return Error("Failed to determine the basename of the URI '" +
uri.value() + "' with error: " + basename.error());
string path = path::join(sandboxDirectory, basename.get());
Try<string> downloaded = download(uri.value(), path, frameworksHome);
if (downloaded.isError()) {
return Error(downloaded.error());
if (uri.executable()) {
return chmodExecutable(downloaded.get());
} else if (uri.extract()) {
Try<bool> extracted = extract(path, sandboxDirectory);
if (extracted.isError()) {
return Error(extracted.error());
} else if (!extracted.get()) {
LOG(WARNING) << "Copying instead of extracting resource from URI with "
<< "'extract' flag, because it does not seem to be an "
<< "archive: " << uri.value();
return downloaded;
// Returns the resulting file or in case of extraction the destination
// directory (for logging).
static Try<string> fetchFromCache(
const FetcherInfo::Item& item,
const string& cacheDirectory,
const string& sandboxDirectory)
LOG(INFO) << "Fetching from cache";
Try<string> basename = Fetcher::basename(item.uri().value());
if (basename.isError()) {
return Error(basename.error());
string destinationPath = path::join(sandboxDirectory, basename.get());
// Non-empty cache filename is guaranteed by the callers of this function.
string sourcePath = path::join(cacheDirectory, item.cache_filename());
if (item.uri().executable()) {
Try<string> copied = copyFile(sourcePath, destinationPath);
if (copied.isError()) {
return Error(copied.error());
return chmodExecutable(copied.get());
} else if (item.uri().extract()) {
Try<bool> extracted = extract(sourcePath, sandboxDirectory);
if (extracted.isError()) {
return Error(extracted.error());
} else if (extracted.get()) {
return sandboxDirectory;
} else {
LOG(WARNING) << "Copying instead of extracting resource from URI with "
<< "'extract' flag, because it does not seem to be an "
<< "archive: " << item.uri().value();
return copyFile(sourcePath, destinationPath);
// Returns the resulting file or in case of extraction the destination
// directory (for logging).
static Try<string> fetchThroughCache(
const FetcherInfo::Item& item,
const Option<string>& cacheDirectory,
const string& sandboxDirectory,
const Option<string>& frameworksHome)
if (cacheDirectory.isNone() || cacheDirectory.get().empty()) {
return Error("Cache directory not specified");
if (!item.has_cache_filename() || item.cache_filename().empty()) {
// This should never happen if this program is used by the Mesos
// slave and could then be a CHECK. But other uses are possible.
return Error("No cache file name for: " + item.uri().value());
CHECK_NE(FetcherInfo::Item::BYPASS_CACHE, item.action())
<< "Unexpected fetcher action selector";
if (item.action() == FetcherInfo::Item::DOWNLOAD_AND_CACHE) {
LOG(INFO) << "Downloading into cache";
Try<Nothing> mkdir = os::mkdir(cacheDirectory.get());
if (mkdir.isError()) {
return Error("Failed to create fetcher cache directory '" +
cacheDirectory.get() + "': " + mkdir.error());
Try<string> downloaded = download(
path::join(cacheDirectory.get(), item.cache_filename()),
if (downloaded.isError()) {
return Error(downloaded.error());
return fetchFromCache(item, cacheDirectory.get(), sandboxDirectory);
// Returns the resulting file or in case of extraction the destination
// directory (for logging).
static Try<string> fetch(
const FetcherInfo::Item& item,
const Option<string>& cacheDirectory,
const string& sandboxDirectory,
const Option<string>& frameworksHome)
LOG(INFO) << "Fetching URI '" << item.uri().value() << "'";
if (item.action() == FetcherInfo::Item::BYPASS_CACHE) {
return fetchBypassingCache(
return fetchThroughCache(
// This "fetcher program" is invoked by the slave's fetcher actor
// (Fetcher, FetcherProcess) to "fetch" URIs into the sandbox directory
// of a given task. Its parameters are provided in the form of the env
// var MESOS_FETCHER_INFO which contains a FetcherInfo (see
// fetcher.proto) object formatted in JSON. These are set by the actor
// to indicate what set of URIs to process and how to proceed with
// each one. A URI can be downloaded directly to the task's sandbox
// directory or it can be copied to a cache first or it can be reused
// from the cache, avoiding downloading. All cache management and
// bookkeeping is centralized in the slave's fetcher actor, which can
// have multiple instances of this fetcher program running at any
// given time. Exit code: 0 if entirely successful, otherwise 1.
int main(int argc, char* argv[])
mesos::internal::logging::Flags flags;
Try<Nothing> load = flags.load("MESOS_", argc, argv);
CHECK_SOME(load) << "Could not load flags: " << load.error();
logging::initialize(argv[0], flags, true); // Catch signals.
const Option<string> jsonFetcherInfo = os::getenv("MESOS_FETCHER_INFO");
<< "Missing MESOS_FETCHER_INFO environment variable";
LOG(INFO) << "Fetcher Info: " << jsonFetcherInfo.get();
Try<JSON::Object> parse = JSON::parse<JSON::Object>(jsonFetcherInfo.get());
CHECK_SOME(parse) << "Failed to parse MESOS_FETCHER_INFO: " << parse.error();
Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
<< "Failed to parse FetcherInfo: " << fetcherInfo.error();
<< "Missing sandbox directory";
const string sandboxDirectory = fetcherInfo.get().sandbox_directory();
const Option<string> cacheDirectory =
fetcherInfo.get().has_cache_directory() ?
Option<string>::some(fetcherInfo.get().cache_directory()) :
const Option<string> frameworksHome =
fetcherInfo.get().has_frameworks_home() ?
Option<string>::some(fetcherInfo.get().frameworks_home()) :
// Fetch each URI to a local file, chmod, then chown if a user is provided.
foreach (const FetcherInfo::Item& item, fetcherInfo.get().items()) {
Try<string> fetched =
fetch(item, cacheDirectory, sandboxDirectory, frameworksHome);
if (fetched.isError()) {
EXIT(1) << "Failed to fetch '" << item.uri().value()
<< "': " + fetched.error();
} else {
LOG(INFO) << "Fetched '" << item.uri().value()
<< "' to '" << fetched.get() << "'";
// Recursively chown the sandbox directory if a user is provided.
if (fetcherInfo.get().has_user()) {
Try<Nothing> chowned = os::chown(
if (chowned.isError()) {
EXIT(1) << "Failed to chown " << sandboxDirectory
<< ": " << chowned.error();
return 0;