blob: 5be7572523f9e93b7a1b4638f76846642112fe18 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <process/collect.hpp>
#include <process/id.hpp>
#include <stout/os.hpp>
#include "slave/flags.hpp"
#include "slave/state.hpp"
#include "slave/containerizer/mesos/isolators/docker/volume/isolator.hpp"
namespace paths = mesos::internal::slave::docker::volume::paths;
using std::list;
using std::string;
using std::vector;
using process::Failure;
using process::Future;
using process::Owned;
using process::PID;
using mesos::internal::slave::docker::volume::DriverClient;
using mesos::slave::ContainerConfig;
using mesos::slave::ContainerLaunchInfo;
using mesos::slave::ContainerLimitation;
using mesos::slave::ContainerState;
using mesos::slave::Isolator;
namespace mesos {
namespace internal {
namespace slave {
DockerVolumeIsolatorProcess::DockerVolumeIsolatorProcess(
const Flags& _flags,
const string& _rootDir,
const Owned<DriverClient>& _client)
: ProcessBase(process::ID::generate("docker-volume-isolator")),
flags(_flags),
rootDir(_rootDir),
client(_client) {}
DockerVolumeIsolatorProcess::~DockerVolumeIsolatorProcess() {}
Try<Isolator*> DockerVolumeIsolatorProcess::create(const Flags& flags)
{
// Check for root permission.
if (geteuid() != 0) {
return Error("The 'docker/volume' isolator requires root permissions");
}
// TODO(gyliu513): Check dvdcli version, the version need to be
// greater than or equal to 0.2.0.
Option<string> dvdcli = os::which("dvdcli");
if (dvdcli.isNone()) {
return Error("The 'docker/volume' isolator cannot get dvdcli command");
}
VLOG(1) << "Found 'dvdcli' at '" << dvdcli.get() << "'";
Try<Owned<DriverClient>> client = DriverClient::create(dvdcli.get());
if (client.isError()) {
return Error(
"Unable to create docker volume driver client: " + client.error());
}
Try<Isolator*> isolator =
DockerVolumeIsolatorProcess::_create(flags, client.get());
if (isolator.isError()) {
return Error(isolator.error());
}
return isolator.get();
}
Try<Isolator*> DockerVolumeIsolatorProcess::_create(
const Flags& flags,
const Owned<DriverClient>& client)
{
// Create the docker volume information root directory if it does
// not exist, this directory is used to checkpoint the docker
// volumes used by containers.
Try<Nothing> mkdir = os::mkdir(flags.docker_volume_checkpoint_dir);
if (mkdir.isError()) {
return Error(
"Failed to create docker volume information root directory at '" +
flags.docker_volume_checkpoint_dir + "': " + mkdir.error());
}
Result<string> rootDir = os::realpath(flags.docker_volume_checkpoint_dir);
if (!rootDir.isSome()) {
return Error(
"Failed to determine canonical path of docker volume information root "
"directory at '" + flags.docker_volume_checkpoint_dir + "': " +
(rootDir.isError() ? rootDir.error() : "No such file or directory"));
}
VLOG(1) << "Initialized the docker volume information root directory at '"
<< rootDir.get() << "'";
Owned<MesosIsolatorProcess> process(
new DockerVolumeIsolatorProcess(
flags,
rootDir.get(),
client));
return new MesosIsolator(process);
}
Future<Nothing> DockerVolumeIsolatorProcess::recover(
const list<ContainerState>& states,
const hashset<ContainerID>& orphans)
{
if (!os::exists(rootDir)) {
VLOG(1) << "The checkpoint directory at '" << rootDir
<< "' does not exist. Skipping recovery.";
return Nothing();
}
foreach (const ContainerState& state, states) {
const ContainerID& containerId = state.container_id();
Try<Nothing> recover = _recover(containerId);
if (recover.isError()) {
return Failure(
"Failed to recover docker volumes for container " +
stringify(containerId) + ": " + recover.error());
}
}
Try<list<string>> entries = os::ls(rootDir);
if (entries.isError()) {
return Failure(
"Unable to list docker volume checkpoint directory '" +
rootDir + "': " + entries.error());
}
foreach (const string& entry, entries.get()) {
ContainerID containerId;
containerId.set_value(Path(entry).basename());
if (infos.contains(containerId)) {
continue;
}
// Recover docker volume information for orphan container.
Try<Nothing> recover = _recover(containerId);
if (recover.isError()) {
return Failure(
"Failed to recover docker volumes for orphan container " +
stringify(containerId) + ": " + recover.error());
}
// Known orphan containers will be cleaned up by containerizer
// using the normal cleanup path. See MESOS-2367 for details.
if (orphans.contains(containerId)) {
continue;
}
LOG(INFO) << "Cleanup volumes for unknown orphaned "
<< "container " << containerId;
cleanup(containerId);
}
return Nothing();
}
Try<Nothing> DockerVolumeIsolatorProcess::_recover(
const ContainerID& containerId)
{
// NOTE: This method will add an 'Info' to 'infos' only if the
// container was launched by the docker volume isolator.
const string containerDir =
paths::getContainerDir(rootDir, containerId.value());
if (!os::exists(containerDir)) {
// This may occur in the following cases:
// 1. Executor has exited and the isolator has removed the
// container directory in '_cleanup()' but agent dies before
// noticing this.
// 2. Agent dies before the isolator creates the container
// directory in 'prepare()'.
// For the above cases, we do not need to do anything since there
// is nothing to clean up after agent restarts.
return Nothing();
}
const string volumesPath =
paths::getVolumesPath(rootDir, containerId.value());
if (!os::exists(volumesPath)) {
// This could happen if the slave died after creating the container
// directory but before it checkpointed anything in it.
LOG(WARNING) << "The docker volumes checkpointed at '" << volumesPath
<< "' for container " << containerId << " does not exist";
// Construct an info object with empty docker volumes since no docker
// volumes are mounted yet for this container, and this container will
// be cleaned up by containerizer (as known orphan container) or by
// `recover` (as unknown orphan container).
infos.put(containerId, Owned<Info>(new Info(hashset<DockerVolume>())));
return Nothing();
}
Try<string> read = os::read(volumesPath);
if (read.isError()) {
return Error(
"Failed to read docker volumes checkpoint file '" +
volumesPath + "': " + read.error());
}
if (read->empty()) {
// This could happen if the slave is hard rebooted after the file is
// created but before the data is synced on disk.
LOG(WARNING) << "The docker volumes checkpointed at '" << volumesPath
<< "' for container " << containerId << " is empty";
// Construct an info object with empty docker volumes since no docker
// volumes are mounted yet for this container, and this container will
// be cleaned up by containerizer (as known orphan container) or by
// `recover` (as unknown orphan container).
infos.put(containerId, Owned<Info>(new Info(hashset<DockerVolume>())));
return Nothing();
}
Try<JSON::Object> json = JSON::parse<JSON::Object>(read.get());
if (json.isError()) {
return Error("JSON parse failed: " + json.error());
}
Try<DockerVolumes> parse = ::protobuf::parse<DockerVolumes>(json.get());
if (parse.isError()) {
return Error("Protobuf parse failed: " + parse.error());
}
hashset<DockerVolume> volumes;
foreach (const DockerVolume& volume, parse->volumes()) {
VLOG(1) << "Recovering docker volume with driver '"
<< volume.driver() << "' and name '" << volume.name()
<< "' for container " << containerId;
if (volumes.contains(volume)) {
return Error(
"Duplicate docker volume with driver '" + volume.driver() + "' "
"and name '" + volume.name() + "'");
}
volumes.insert(volume);
}
infos.put(containerId, Owned<Info>(new Info(volumes)));
return Nothing();
}
Future<Option<ContainerLaunchInfo>> DockerVolumeIsolatorProcess::prepare(
const ContainerID& containerId,
const ContainerConfig& containerConfig)
{
const ExecutorInfo& executorInfo = containerConfig.executor_info();
if (!executorInfo.has_container()) {
return None();
}
if (executorInfo.container().type() != ContainerInfo::MESOS) {
return Failure(
"Can only prepare docker volume driver for a MESOS container");
}
// The hashset is used to check if there are duplicated docker
// volume for the same container.
hashset<DockerVolume> volumes;
// Represents mounts that will be sent to the driver client.
struct Mount
{
DockerVolume volume;
hashmap<string, string> options;
};
vector<Mount> mounts;
// The mount points in the container.
vector<string> targets;
foreach (const Volume& _volume, executorInfo.container().volumes()) {
if (!_volume.has_source()) {
continue;
}
if (_volume.source().type() != Volume::Source::DOCKER_VOLUME) {
VLOG(1) << "Ignored volume type '" << _volume.source().type()
<< "' for container " << containerId << " as only "
<< "'DOCKER_VOLUME' was supported by the docker "
<< "volume isolator";
continue;
}
const string& name = _volume.source().docker_volume().name();
if (!_volume.source().docker_volume().has_driver()) {
return Failure("The volume driver is not specified for volume '" +
name + "' with container " + stringify(containerId));
}
const string& driver = _volume.source().docker_volume().driver();
DockerVolume volume;
volume.set_driver(driver);
volume.set_name(name);
if (volumes.contains(volume)) {
return Failure(
"Found duplicate docker volume with driver '" +
driver + "' and name '" + name + "'");
}
// Determine driver options.
hashmap<string, string> options;
if (_volume.source().docker_volume().has_driver_options()) {
foreach (const Parameter& parameter,
_volume.source().docker_volume().driver_options().parameter()) {
options[parameter.key()] = parameter.value();
}
}
// Determine the target of the mount.
string target;
// The logic to determine a volume mount target is identical to
// linux filesystem isolator, because docker volume isolator has
// a dependency on that isolator, and it assumes that if the
// container specifies a rootfs the sandbox is already bind
// mounted into the container.
if (path::absolute(_volume.container_path())) {
// To specify a docker volume for a container, operators should
// be allowed to define the 'container_path' either as an absolute
// path or a relative path. Please see linux filesystem isolator
// for detail.
if (containerConfig.has_rootfs()) {
target = path::join(
containerConfig.rootfs(),
_volume.container_path());
Try<Nothing> mkdir = os::mkdir(target);
if (mkdir.isError()) {
return Failure(
"Failed to create the target of the mount at '" +
target + "': " + mkdir.error());
}
} else {
target = _volume.container_path();
if (!os::exists(target)) {
return Failure("Absolute container path '" + target + "' "
"does not exist");
}
}
} else {
if (containerConfig.has_rootfs()) {
target = path::join(containerConfig.rootfs(),
flags.sandbox_directory,
_volume.container_path());
} else {
target = path::join(containerConfig.directory(),
_volume.container_path());
}
// NOTE: We cannot create the mount point at 'target' if
// container has rootfs defined. The bind mount of the sandbox
// will hide what's inside 'target'. So we should always create
// the mount point in 'directory'.
string mountPoint = path::join(
containerConfig.directory(),
_volume.container_path());
Try<Nothing> mkdir = os::mkdir(mountPoint);
if (mkdir.isError()) {
return Failure(
"Failed to create the target of the mount at '" +
mountPoint + "': " + mkdir.error());
}
}
Mount mount;
mount.volume = volume;
mount.options = options;
volumes.insert(volume);
mounts.push_back(mount);
targets.push_back(target);
}
// It is possible that there is no external volume specified for
// this container. We avoid checkpointing empty state and creating
// an empty `Info`.
if (volumes.empty()) {
return None();
}
// Create the container directory.
const string containerDir =
paths::getContainerDir(rootDir, containerId.value());
Try<Nothing> mkdir = os::mkdir(containerDir);
if (mkdir.isError()) {
return Failure(
"Failed to create the container directory at '" +
containerDir + "': " + mkdir.error());
}
// Create DockerVolumes protobuf message to checkpoint.
DockerVolumes state;
foreach (const DockerVolume& volume, volumes) {
state.add_volumes()->CopyFrom(volume);
}
const string volumesPath =
paths::getVolumesPath(rootDir, containerId.value());
Try<Nothing> checkpoint = state::checkpoint(
volumesPath,
stringify(JSON::protobuf(state)));
if (checkpoint.isError()) {
return Failure(
"Failed to checkpoint docker volumes at '" +
volumesPath + "': " + checkpoint.error());
}
VLOG(1) << "Successfully created checkpoint at '" << volumesPath << "'";
infos.put(containerId, Owned<Info>(new Info(volumes)));
// Invoke driver client to create the mount.
list<Future<string>> futures;
foreach (const Mount& mount, mounts) {
futures.push_back(this->mount(
mount.volume.driver(),
mount.volume.name(),
mount.options));
}
// NOTE: Wait for all `mount()` to finish before returning to make
// sure `unmount()` is not called (via 'cleanup()') if some mount on
// is still pending.
return await(futures)
.then(defer(
PID<DockerVolumeIsolatorProcess>(this),
&DockerVolumeIsolatorProcess::_prepare,
containerId,
targets,
lambda::_1));
}
Future<Option<ContainerLaunchInfo>> DockerVolumeIsolatorProcess::_prepare(
const ContainerID& containerId,
const vector<string>& targets,
const list<Future<string>>& futures)
{
ContainerLaunchInfo launchInfo;
launchInfo.add_clone_namespaces(CLONE_NEWNS);
vector<string> messages;
vector<string> sources;
foreach (const Future<string>& future, futures) {
if (!future.isReady()) {
messages.push_back(future.isFailed() ? future.failure() : "discarded");
continue;
}
sources.push_back(strings::trim(future.get()));
}
if (!messages.empty()) {
return Failure(strings::join("\n", messages));
}
CHECK_EQ(sources.size(), targets.size());
for (size_t i = 0; i < sources.size(); i++) {
const string& source = sources[i];
const string& target = targets[i];
LOG(INFO) << "Mounting docker volume mount point '" << source
<< "' to '" << target << "' for container " << containerId;
// Launch mount command as a non-shell subprocess to avoid
// injecting arbitrary shell commands (e.g., user defined
// 'container_path' in volume can be postfixed with any
// unsafe arbitrary commands).
CommandInfo* command = launchInfo.add_pre_exec_commands();
command->set_shell(false);
command->set_value("mount");
command->add_arguments("mount");
command->add_arguments("-n");
command->add_arguments("--rbind");
command->add_arguments(source);
command->add_arguments(target);
}
return launchInfo;
}
Future<Nothing> DockerVolumeIsolatorProcess::cleanup(
const ContainerID& containerId)
{
if (!infos.contains(containerId)) {
VLOG(1) << "Ignoring cleanup request for unknown container " << containerId;
return Nothing();
}
hashmap<DockerVolume, int> references;
foreachvalue (const Owned<Info>& info, infos) {
foreach (const DockerVolume& volume, info->volumes) {
if (!references.contains(volume)) {
references[volume] = 1;
} else {
references[volume]++;
}
}
}
list<Future<Nothing>> futures;
foreach (const DockerVolume& volume, infos[containerId]->volumes) {
if (references.contains(volume) && references[volume] > 1) {
VLOG(1) << "Cannot unmount the volume with driver '"
<< volume.driver() << "' and name '" << volume.name()
<< "' for container " << containerId
<< " since its reference count is " << references[volume];
continue;
}
LOG(INFO) << "Unmounting the volume with driver '"
<< volume.driver() << "' and name '" << volume.name()
<< "' for container " << containerId;
// Invoke dvdcli client to unmount the docker volume.
futures.push_back(this->unmount(volume.driver(), volume.name()));
}
// Erase the `Info` struct of this container before unmounting the volumes.
// This is to ensure the reference count of the volume will not be wrongly
// increased if unmounting volumes fail, otherwise next time when another
// container using the same volume is destroyed, we would NOT unmount the
// volume since its reference count would be larger than 1.
infos.erase(containerId);
return await(futures)
.then(defer(
PID<DockerVolumeIsolatorProcess>(this),
&DockerVolumeIsolatorProcess::_cleanup,
containerId,
lambda::_1));
}
Future<Nothing> DockerVolumeIsolatorProcess::_cleanup(
const ContainerID& containerId,
const list<Future<Nothing>>& futures)
{
vector<string> messages;
foreach (const Future<Nothing>& future, futures) {
if (!future.isReady()) {
messages.push_back(future.isFailed() ? future.failure() : "discarded");
}
}
if (!messages.empty()) {
return Failure(strings::join("\n", messages));
}
const string containerDir =
paths::getContainerDir(rootDir, containerId.value());
Try<Nothing> rmdir = os::rmdir(containerDir);
if (rmdir.isError()) {
return Failure(
"Failed to remove the checkpoint directory at '" +
containerDir + "': " + rmdir.error());
}
LOG(INFO) << "Removed the checkpoint directory at '" << containerDir
<< "' for container " << containerId;
return Nothing();
}
Future<string> DockerVolumeIsolatorProcess::mount(
const string& driver,
const string& name,
const hashmap<string, string>& options)
{
DockerVolume volume;
volume.set_driver(driver);
volume.set_name(name);
return sequences[volume].add<string>(
defer(PID<DockerVolumeIsolatorProcess>(this), [=]() -> Future<string> {
return _mount(driver, name, options);
}));
}
Future<string> DockerVolumeIsolatorProcess::_mount(
const string& driver,
const string& name,
const hashmap<string, string>& options)
{
return client->mount(driver, name, options);
}
Future<Nothing> DockerVolumeIsolatorProcess::unmount(
const string& driver,
const string& name)
{
DockerVolume volume;
volume.set_driver(driver);
volume.set_name(name);
return sequences[volume].add<Nothing>(
defer(PID<DockerVolumeIsolatorProcess>(this), [=]() -> Future<Nothing> {
return _unmount(driver, name);
}));
}
Future<Nothing> DockerVolumeIsolatorProcess::_unmount(
const string& driver,
const string& name)
{
return client->unmount(driver, name);
}
} // namespace slave {
} // namespace internal {
} // namespace mesos {