blob: bbba8cf1821845c218aba25175c89d258a51a85d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <string>
#include <vector>
#include <glog/logging.h>
#include <mesos/docker/spec.hpp>
#include <mesos/secret/resolver.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/json.hpp>
#include <stout/os.hpp>
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/executor.hpp>
#include <process/id.hpp>
#include <process/metrics/metrics.hpp>
#include <process/metrics/timer.hpp>
#include "slave/containerizer/mesos/provisioner/constants.hpp"
#include "slave/containerizer/mesos/provisioner/utils.hpp"
#include "slave/containerizer/mesos/provisioner/docker/metadata_manager.hpp"
#include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
#include "slave/containerizer/mesos/provisioner/docker/puller.hpp"
#include "slave/containerizer/mesos/provisioner/docker/store.hpp"
#include "uri/fetcher.hpp"
namespace spec = docker::spec;
using std::list;
using std::string;
using std::vector;
using process::Failure;
using process::Future;
using process::Owned;
using process::Process;
using process::Promise;
using process::defer;
using process::dispatch;
using process::spawn;
using process::terminate;
using process::wait;
namespace mesos {
namespace internal {
namespace slave {
namespace docker {
class StoreProcess : public Process<StoreProcess>
{
public:
StoreProcess(
const Flags& _flags,
const Owned<MetadataManager>& _metadataManager,
const Owned<Puller>& _puller)
: ProcessBase(process::ID::generate("docker-provisioner-store")),
flags(_flags),
metadataManager(_metadataManager),
puller(_puller)
{
}
~StoreProcess() override {}
Future<Nothing> recover();
Future<ImageInfo> get(
const mesos::Image& image,
const string& backend);
Future<Nothing> prune(
const std::vector<mesos::Image>& excludeImages,
const hashset<string>& activeLayerPaths);
private:
struct Metrics
{
Metrics() :
image_pull(
"containerizer/mesos/provisioner/docker_store/image_pull", Hours(1))
{
process::metrics::add(image_pull);
}
~Metrics()
{
process::metrics::remove(image_pull);
}
process::metrics::Timer<Milliseconds> image_pull;
};
Future<Image> _get(
const spec::ImageReference& reference,
const Option<Secret>& config,
const Option<Image>& image,
const string& backend);
Future<ImageInfo> __get(
const Image& image,
const string& backend);
Future<Image> moveLayers(
const string& staging,
const Image& image,
const string& backend);
Future<Nothing> moveLayer(
const string& staging,
const string& layerId,
const string& backend);
Future<Nothing> _prune(
const hashset<string>& activeLayerPaths,
const hashset<string>& retainedImageLayers);
const Flags flags;
Owned<MetadataManager> metadataManager;
Owned<Puller> puller;
// The get() method deduplicates image pulls by caching a single Future
// per each initiated image pull, handing out `undiscardable(pull)` to the
// caller (to prevent discards by the callers from propagating and discarding
// the pull) and tracking the number of the handed out `undiscardable`s for
// each pending pull that themselves have not been discarded yet (`useCount`).
//
// The pull future itself is discarded as soon as its `useCount` drops to
// zero to notify the puller that the pull has been cancelled.
// A pull future and its use count.
struct Pull
{
Future<Image> future;
size_t useCount;
};
// The pending pull futures are keyed by the stringified `ImageReference`
// for the image being pulled.
hashmap<string, Pull> pulling;
// For executing path removals in a separated actor.
process::Executor executor;
Metrics metrics;
};
Try<Owned<slave::Store>> Store::create(
const Flags& flags,
SecretResolver* secretResolver)
{
// TODO(jieyu): We should inject URI fetcher from top level, instead
// of creating it here.
uri::fetcher::Flags _flags;
// TODO(dpravat): Remove after resolving MESOS-5473.
#ifndef __WINDOWS__
_flags.docker_config = flags.docker_config;
_flags.docker_stall_timeout = flags.fetcher_stall_timeout;
#endif
if (flags.hadoop_home.isSome()) {
_flags.hadoop_client = path::join(flags.hadoop_home.get(), "bin", "hadoop");
}
Try<Owned<uri::Fetcher>> fetcher = uri::fetcher::create(_flags);
if (fetcher.isError()) {
return Error("Failed to create the URI fetcher: " + fetcher.error());
}
Try<Owned<Puller>> puller =
Puller::create(flags, fetcher->share(), secretResolver);
if (puller.isError()) {
return Error("Failed to create Docker puller: " + puller.error());
}
Try<Owned<slave::Store>> store = Store::create(flags, puller.get());
if (store.isError()) {
return Error("Failed to create Docker store: " + store.error());
}
return store.get();
}
Try<Owned<slave::Store>> Store::create(
const Flags& flags,
const Owned<Puller>& puller)
{
Try<Nothing> mkdir = os::mkdir(flags.docker_store_dir);
if (mkdir.isError()) {
return Error("Failed to create Docker store directory: " +
mkdir.error());
}
mkdir = os::mkdir(paths::getStagingDir(flags.docker_store_dir));
if (mkdir.isError()) {
return Error("Failed to create Docker store staging directory: " +
mkdir.error());
}
mkdir = os::mkdir(paths::getGcDir(flags.docker_store_dir));
if (mkdir.isError()) {
return Error("Failed to create Docker store gc directory: " +
mkdir.error());
}
Try<Owned<MetadataManager>> metadataManager = MetadataManager::create(flags);
if (metadataManager.isError()) {
return Error(metadataManager.error());
}
Owned<StoreProcess> process(
new StoreProcess(flags, metadataManager.get(), puller));
return Owned<slave::Store>(new Store(process));
}
Store::Store(Owned<StoreProcess> _process) : process(_process)
{
spawn(CHECK_NOTNULL(process.get()));
}
Store::~Store()
{
terminate(process.get());
wait(process.get());
}
Future<Nothing> Store::recover()
{
return dispatch(process.get(), &StoreProcess::recover);
}
Future<ImageInfo> Store::get(
const mesos::Image& image,
const string& backend)
{
return dispatch(process.get(), &StoreProcess::get, image, backend);
}
Future<Nothing> Store::prune(
const vector<mesos::Image>& excludedImages,
const hashset<string>& activeLayerPaths)
{
return dispatch(
process.get(), &StoreProcess::prune, excludedImages, activeLayerPaths);
}
Future<Nothing> StoreProcess::recover()
{
return metadataManager->recover();
}
Future<ImageInfo> StoreProcess::get(
const mesos::Image& image,
const string& backend)
{
if (image.type() != mesos::Image::DOCKER) {
return Failure("Docker provisioner store only supports Docker images");
}
Try<spec::ImageReference> reference =
spec::parseImageReference(image.docker().name());
if (reference.isError()) {
return Failure("Failed to parse docker image '" + image.docker().name() +
"': " + reference.error());
}
return metadataManager->get(reference.get(), image.cached())
.then(defer(self(),
&Self::_get,
reference.get(),
image.docker().has_config()
? image.docker().config()
: Option<Secret>(),
lambda::_1,
backend))
.then(defer(self(), &Self::__get, lambda::_1, backend));
}
Future<Image> StoreProcess::_get(
const spec::ImageReference& reference,
const Option<Secret>& config,
const Option<Image>& image,
const string& backend)
{
// NOTE: Here, we assume that image layers are not removed without
// first removing the metadata in the metadata manager first.
// Otherwise, the image we return here might miss some layers. At
// the time we introduce cache eviction, we also want to avoid the
// situation where a layer was returned to the provisioner but is
// later evicted.
if (image.isSome()) {
// It is possible that a layer is missed after recovery if the
// agent flag `--image_provisioner_backend` is changed from a
// specified backend to `None()`. We need to check that each
// layer exists for a cached image.
bool layerMissed = false;
foreach (const string& layerId, image->layer_ids()) {
const string rootfsPath = paths::getImageLayerRootfsPath(
flags.docker_store_dir,
layerId,
backend);
if (!os::exists(rootfsPath)) {
layerMissed = true;
break;
}
}
if (image->has_config_digest() &&
!os::exists(paths::getImageLayerPath(
flags.docker_store_dir,
image->config_digest()))) {
layerMissed = true;
}
if (!layerMissed) {
LOG(INFO) << "Using cached image '" << reference << "'";
return image.get();
}
}
const string pullKey = stringify(reference);
Future<Image> pull = [&]() -> Future<Image> {
if (pulling.contains(pullKey)) {
pulling.at(pullKey).useCount += 1;
// NOTE: In the rare case when the future has already failed but the
// onAny() callback has not been executed yet, we will hand out an already
// FAILED future. This is basically equivalent to handing out a PENDING
// future that will fail immediately, and should not be an issue.
CHECK(!pulling.at(pullKey).future.hasDiscard());
return pulling.at(pullKey).future;
}
Try<string> staging =
os::mkdtemp(paths::getStagingTempDir(flags.docker_store_dir));
if (staging.isError()) {
return Failure(
"Failed to create a staging directory: " + staging.error());
}
LOG(INFO) << "Pulling image '" << reference << "'";
Future<Image> pull = metrics.image_pull.time(
puller->pull(reference, staging.get(), backend, config)
.then(defer(
self(), &Self::moveLayers, staging.get(), lambda::_1, backend))
.then(defer(
self(),
[=](const Image& image) {
LOG(INFO) << "Caching image '" << reference << "'";
return metadataManager->put(image);
}))
.onAny(defer(self(), [=](const Future<Image>& image) {
pulling.erase(pullKey);
LOG(INFO) << "Removing staging directory '" << staging.get() << "'";
Try<Nothing> rmdir = os::rmdir(staging.get());
if (rmdir.isError()) {
LOG(WARNING) << "Failed to remove staging directory '"
<< staging.get() << "': " << rmdir.error();
}
})));
pulling[pullKey] = {pull, 1};
return pull;
}();
// Each call of this method returns an `undiscardable` future referencing the
// `pull` future. The `pull` future itself will be discarded as soon as
// all the `undiscardable`s have been discarded.
return undiscardable(pull)
.onDiscard(defer(self(), [=]() {
if (pulling.contains(pullKey)) {
pulling.at(pullKey).useCount -= 1;
if (pulling.at(pullKey).useCount == 0) {
pulling.at(pullKey).future.discard();
pulling.erase(pullKey);
}
}
}));
}
Future<ImageInfo> StoreProcess::__get(
const Image& image,
const string& backend)
{
CHECK_LT(0, image.layer_ids_size());
vector<string> layerPaths;
foreach (const string& layerId, image.layer_ids()) {
layerPaths.push_back(paths::getImageLayerRootfsPath(
flags.docker_store_dir,
layerId,
backend));
}
string configPath;
if (image.has_config_digest()) {
// Optional 'config_digest' will only be set for docker manifest
// V2 Schema2 case.
configPath = paths::getImageLayerPath(
flags.docker_store_dir,
image.config_digest());
} else {
// Read the manifest from the last layer because all runtime config
// are merged at the leaf already.
configPath = paths::getImageLayerManifestPath(
flags.docker_store_dir,
image.layer_ids(image.layer_ids_size() - 1));
}
Try<string> manifest = os::read(configPath);
if (manifest.isError()) {
return Failure(
"Failed to read manifest from '" + configPath + "': " +
manifest.error());
}
Try<::docker::spec::v1::ImageManifest> v1 =
::docker::spec::v1::parse(manifest.get());
if (v1.isError()) {
return Failure(
"Failed to parse docker v1 manifest from '" + configPath + "': " +
v1.error());
}
if (image.has_config_digest()) {
return ImageInfo{layerPaths, v1.get(), None(), configPath};
}
return ImageInfo{layerPaths, v1.get()};
}
Future<Image> StoreProcess::moveLayers(
const string& staging,
const Image& image,
const string& backend)
{
LOG(INFO) << "Moving layers from staging directory '" << staging
<< "' to image store for image '" << image.reference() << "'";
vector<Future<Nothing>> futures;
foreach (const string& layerId, image.layer_ids()) {
futures.push_back(moveLayer(staging, layerId, backend));
}
return collect(futures)
.then(defer(self(), [=]() -> Future<Image> {
if (image.has_config_digest()) {
const string configSource = path::join(staging, image.config_digest());
const string configTarget = paths::getImageLayerPath(
flags.docker_store_dir,
image.config_digest());
if (!os::exists(configTarget)) {
Try<Nothing> rename = os::rename(configSource, configTarget);
if (rename.isError()) {
return Failure(
"Failed to move image manifest config from '" + configSource +
"' to '" + configTarget + "': " + rename.error());
}
}
}
return image;
}));
}
Future<Nothing> StoreProcess::moveLayer(
const string& staging,
const string& layerId,
const string& backend)
{
const string source = path::join(staging, layerId);
// This is the case where the puller skips the pulling of the layer
// because the layer already exists in the store.
//
// TODO(jieyu): Verify that the layer is actually in the store.
if (!os::exists(source)) {
return Nothing();
}
const string targetRootfs = paths::getImageLayerRootfsPath(
flags.docker_store_dir,
layerId,
backend);
// NOTE: Since the layer id is supposed to be unique. If the layer
// already exists in the store, we'll skip the moving since they are
// expected to be the same.
if (os::exists(targetRootfs)) {
return Nothing();
}
const string sourceRootfs = paths::getImageLayerRootfsPath(source, backend);
const string target = paths::getImageLayerPath(
flags.docker_store_dir,
layerId);
#ifdef __linux__
// If the backend is "overlay", we need to convert
// AUFS whiteout files to OverlayFS whiteout files.
if (backend == OVERLAY_BACKEND) {
Try<Nothing> convert = convertWhiteouts(sourceRootfs);
if (convert.isError()) {
return Failure(
"Failed to convert the whiteout files under '" +
sourceRootfs + "': " + convert.error());
}
}
#endif
if (!os::exists(target)) {
// This is the case that we pull the layer for the first time.
Try<Nothing> mkdir = os::mkdir(target);
if (mkdir.isError()) {
return Failure(
"Failed to create directory in store for layer '" +
layerId + "': " + mkdir.error());
}
Try<Nothing> rename = os::rename(source, target);
if (rename.isError()) {
return Failure(
"Failed to move layer from '" + source +
"' to '" + target + "': " + rename.error());
}
} else {
// This is the case where the layer has already been pulled with a
// different backend.
Try<Nothing> rename = os::rename(sourceRootfs, targetRootfs);
if (rename.isError()) {
return Failure(
"Failed to move rootfs from '" + sourceRootfs +
"' to '" + targetRootfs + "': " + rename.error());
}
}
return Nothing();
}
Future<Nothing> StoreProcess::prune(
const vector<mesos::Image>& excludedImages,
const hashset<string>& activeLayerPaths)
{
// All existing pulling should have finished.
if (!pulling.empty()) {
return Failure("Cannot prune and pull at the same time");
}
vector<spec::ImageReference> imageReferences;
imageReferences.reserve(excludedImages.size());
foreach (const mesos::Image& image, excludedImages) {
Try<spec::ImageReference> reference =
spec::parseImageReference(image.docker().name());
if (reference.isError()) {
return Failure(
"Failed to parse docker image '" + image.docker().name() +
"': " + reference.error());
}
imageReferences.push_back(reference.get());
}
return metadataManager->prune(imageReferences)
.then(defer(self(), &Self::_prune, activeLayerPaths, lambda::_1));
}
Future<Nothing> StoreProcess::_prune(
const hashset<string>& activeLayerRootfses,
const hashset<string>& retainedLayerIds)
{
Try<list<string>> allLayers = paths::listLayers(flags.docker_store_dir);
if (allLayers.isError()) {
return Failure("Failed to find all layer paths: " + allLayers.error());
}
// Paths in provisioner are layer rootfs. Normalize them to layer
// path.
hashset<string> activeLayerPaths;
foreach (const string& rootfsPath, activeLayerRootfses) {
activeLayerPaths.insert(Path(rootfsPath).dirname());
}
foreach (const string& layerId, allLayers.get()) {
if (retainedLayerIds.contains(layerId)) {
VLOG(1) << "Layer '" << layerId << "' is retained by image store cache";
continue;
}
const string layerPath =
paths::getImageLayerPath(flags.docker_store_dir, layerId);
if (activeLayerPaths.contains(layerPath)) {
VLOG(1) << "Layer '" << layerId << "' is retained by active container";
continue;
}
const string target =
paths::getGcLayerPath(flags.docker_store_dir, layerId);
if (os::exists(target)) {
return Failure("Marking phase target '" + target + "' already exists");
}
VLOG(1) << "Marking layer '" << layerId << "' to gc by renaming '"
<< layerPath << "' to '" << target << "'";
Try<Nothing> rename = os::rename(layerPath, target);
if (rename.isError()) {
return Failure(
"Failed to move layer from '" + layerPath +
"' to '" + target + "': " + rename.error());
}
}
const string gcDir = paths::getGcDir(flags.docker_store_dir);
auto rmdirs = [gcDir]() {
Try<list<string>> targets = os::ls(gcDir);
if (targets.isError()) {
LOG(WARNING) << "Error when listing gcDir '" << gcDir
<< "': " << targets.error();
return Nothing();
}
foreach (const string& target, targets.get()) {
const string path = path::join(gcDir, target);
// Run the removal operation with 'continueOnError = false'.
// A possible situation is that we incorrectly marked a layer
// which is still used by certain layer based backends (aufs, overlay).
// In such a case, we proceed with a warning and try to free up as much
// disk spaces as possible.
LOG(INFO) << "Deleting path '" << path << "'";
Try<Nothing> rmdir = os::rmdir(path, true, true, false);
if (rmdir.isError()) {
LOG(WARNING) << "Failed to delete '" << path << "': "
<< rmdir.error();
} else {
LOG(INFO) << "Deleted '" << path << "'";
}
}
return Nothing();
};
// NOTE: All `rmdirs` calls are dispatched to one executor so that:
// 1. They do not block other dispatches;
// 2. They do not occupy all worker threads.
executor.execute(rmdirs);
return Nothing();
}
} // namespace docker {
} // namespace slave {
} // namespace internal {
} // namespace mesos {