blob: a2799023bbbb1641b23376c1fb72b8548287024f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <list>
#include <string>
#include <vector>
#include <process/collect.hpp>
#include <process/future.hpp>
#include <process/owned.hpp>
#include <stout/os.hpp>
#include <stout/os/realpath.hpp>
#include "common/protobuf_utils.hpp"
#include "linux/fs.hpp"
#include "linux/ns.hpp"
#include "slave/state.hpp"
#include "slave/containerizer/mesos/isolators/volume/csi/isolator.hpp"
#include "slave/containerizer/mesos/isolators/volume/csi/paths.hpp"
using std::list;
using std::string;
using std::vector;
using process::Failure;
using process::Future;
using process::Owned;
using process::PID;
using mesos::slave::ContainerConfig;
using mesos::slave::ContainerLaunchInfo;
using mesos::slave::ContainerState;
using mesos::slave::Isolator;
namespace mesos {
namespace internal {
namespace slave {
using AccessMode = Volume::Source::CSIVolume::VolumeCapability::AccessMode;
Try<Isolator*> VolumeCSIIsolatorProcess::create(
const Flags& flags,
CSIServer* csiServer)
{
if (!strings::contains(flags.isolation, "filesystem/linux")) {
return Error("'filesystem/linux' isolator must be used");
}
if (csiServer == nullptr) {
return Error("No CSI server is provided");
}
const string csiRootDir = path::join(flags.work_dir, csi::paths::CSI_DIR);
// Create the CSI volume information root directory if it does not exist.
Try<Nothing> mkdir = os::mkdir(csiRootDir);
if (mkdir.isError()) {
return Error(
"Failed to create CSI volume information root directory at '" +
csiRootDir + "': " + mkdir.error());
}
Result<string> rootDir = os::realpath(csiRootDir);
if (!rootDir.isSome()) {
return Error(
"Failed to determine canonical path of CSI volume information root"
" directory '" + csiRootDir + "': " +
(rootDir.isError() ? rootDir.error() : "No such file or directory"));
}
Owned<MesosIsolatorProcess> process(new VolumeCSIIsolatorProcess(
flags,
csiServer,
rootDir.get()));
return new MesosIsolator(process);
}
bool VolumeCSIIsolatorProcess::supportsNesting()
{
return true;
}
Future<Nothing> VolumeCSIIsolatorProcess::recover(
const vector<ContainerState>& states,
const hashset<ContainerID>& orphans)
{
foreach (const ContainerState& state, states) {
const ContainerID& containerId = state.container_id();
Try<Nothing> recover = recoverContainer(containerId);
if (recover.isError()) {
return Failure(
"Failed to recover CSI volumes for container " +
stringify(containerId) + ": " + recover.error());
}
}
// Recover any orphan containers that we might have check pointed.
// These orphan containers will be destroyed by the containerizer
// through the regular cleanup path. See MESOS-2367 for details.
foreach (const ContainerID& containerId, orphans) {
Try<Nothing> recover = recoverContainer(containerId);
if (recover.isError()) {
return Failure(
"Failed to recover CSI volumes for orphan container " +
stringify(containerId) + ": " + recover.error());
}
}
// Walk through all the checkpointed containers to determine if
// there are any unknown orphan containers.
Try<list<string>> entries = os::ls(rootDir);
if (entries.isError()) {
return Failure(
"Unable to list CSI volume checkpoint directory '" +
rootDir + "': " + entries.error());
}
foreach (const string& entry, entries.get()) {
ContainerID containerId =
protobuf::parseContainerId(Path(entry).basename());
// Check if this container has already been recovered.
if (infos.contains(containerId)) {
continue;
}
// An unknown orphan container. Recover it and then clean it up.
Try<Nothing> recover = recoverContainer(containerId);
if (recover.isError()) {
return Failure(
"Failed to recover CSI volumes for orphan container " +
stringify(containerId) + ": " + recover.error());
}
LOG(INFO) << "Cleaning up CSI volumes for unknown orphaned "
<< "container " << containerId;
cleanup(containerId);
}
return Nothing();
}
Try<Nothing> VolumeCSIIsolatorProcess::recoverContainer(
const ContainerID& containerId)
{
const string containerDir = csi::paths::getContainerDir(rootDir, containerId);
if (!os::exists(containerDir)) {
// This may occur in the following cases:
// 1. The container has exited and the isolator has removed the
// container directory in '_cleanup()' but agent dies before
// noticing this.
// 2. Agent dies before the isolator checkpoints CSI volumes for
// the container in 'prepare()'.
// For the above cases, we do not need to do anything since there
// is nothing to clean up for this container after agent restarts.
return Nothing();
}
const string volumesPath = csi::paths::getVolumesPath(rootDir, containerId);
if (!os::exists(volumesPath)) {
// This may occur if agent dies after creating the container directory
// but before it checkpoints anything in it.
LOG(WARNING) << "The CSI volumes checkpoint file expected at '"
<< volumesPath << "' for container " << containerId
<< " does not exist";
// Construct an info object with empty CSI volumes since no CSI volumes
// are mounted yet for this container, and this container will be cleaned
// up by containerizer (as known orphan container) or by `recover` (as
// unknown orphan container).
infos.put(containerId, Owned<Info>(new Info(hashset<CSIVolume>())));
return Nothing();
}
Result<CSIVolumes> read = state::read<CSIVolumes>(volumesPath);
if (read.isError()) {
return Error(
"Failed to read the CSI volumes checkpoint file '" +
volumesPath + "': " + read.error());
} else if (read.isNone()) {
// This could happen if agent is hard rebooted after the checkpoint file is
// created but before the data is synced on disk.
LOG(WARNING) << "The CSI volumes checkpointed at '" << volumesPath
<< "' for container " << containerId << " is empty";
// Construct an info object with empty CSI volumes since no CSI volumes
// are mounted yet for this container, and this container will be cleaned
// up by containerizer (as known orphan container) or by `recover` (as
// unknown orphan container).
infos.put(containerId, Owned<Info>(new Info(hashset<CSIVolume>())));
return Nothing();
}
hashset<CSIVolume> volumes;
foreach (const CSIVolume& volume, read->volumes()) {
VLOG(1) << "Recovering CSI volume with plugin '" << volume.plugin_name()
<< "' and ID '" << volume.id() << "' for container " << containerId;
if (volumes.contains(volume)) {
return Error(
"Duplicate CSI volume with plugin '" + volume.plugin_name() +
"' and ID '" + volume.id() + "'");
}
volumes.insert(volume);
}
infos.put(containerId, Owned<Info>(new Info(volumes)));
return Nothing();
}
Future<Option<ContainerLaunchInfo>> VolumeCSIIsolatorProcess::prepare(
const ContainerID& containerId,
const ContainerConfig& containerConfig)
{
if (!containerConfig.has_container_info()) {
return None();
}
if (containerConfig.container_info().type() != ContainerInfo::MESOS) {
return Failure("Can only prepare CSI volumes for a MESOS container");
}
// The hashset is used to check if there are duplicated CSI volumes for the
// same container.
hashset<CSIVolume> volumeSet;
// Represents the CSI volume mounts that we want to do for the container.
vector<Mount> mounts;
foreach (const Volume& _volume, containerConfig.container_info().volumes()) {
if (!_volume.has_source() ||
!_volume.source().has_type() ||
_volume.source().type() != Volume::Source::CSI_VOLUME) {
continue;
}
CHECK(_volume.source().has_csi_volume());
CHECK(_volume.source().csi_volume().has_static_provisioning());
const Volume::Source::CSIVolume& csiVolume = _volume.source().csi_volume();
const string& pluginName = csiVolume.plugin_name();
const string& volumeId = csiVolume.static_provisioning().volume_id();
const AccessMode& accessMode =
csiVolume.static_provisioning().volume_capability().access_mode();
if ((accessMode.mode() == AccessMode::SINGLE_NODE_READER_ONLY ||
accessMode.mode() == AccessMode::MULTI_NODE_READER_ONLY) &&
_volume.mode() == Volume::RW) {
return Failure(
"Cannot use the read-only volume '" +
volumeId + "' in read-write mode");
}
CSIVolume volume;
volume.set_plugin_name(pluginName);
volume.set_id(volumeId);
if (volumeSet.contains(volume)) {
return Failure(
"Found duplicate CSI volume with plugin '" +
pluginName + "' and volume ID '" + volumeId + "'");
}
// Determine the target of the mount.
string target;
// The logic to determine a volume mount target is identical to Linux
// filesystem isolator, because this isolator has a dependency on that
// isolator, and it assumes that if the container specifies a rootfs
// the sandbox is already bind mounted into the container.
if (path::is_absolute(_volume.container_path())) {
// To specify a CSI volume for a container, frameworks should be allowed
// to define the `container_path` either as an absolute path or a relative
// path. Please see Linux filesystem isolator for details.
if (containerConfig.has_rootfs()) {
target = path::join(
containerConfig.rootfs(),
_volume.container_path());
Try<Nothing> mkdir = os::mkdir(target);
if (mkdir.isError()) {
return Failure(
"Failed to create the target of the mount at '" +
target + "': " + mkdir.error());
}
} else {
target = _volume.container_path();
if (!os::exists(target)) {
return Failure("Absolute container path '" + target + "' "
"does not exist");
}
}
} else {
if (containerConfig.has_rootfs()) {
target = path::join(containerConfig.rootfs(),
flags.sandbox_directory,
_volume.container_path());
} else {
target = path::join(containerConfig.directory(),
_volume.container_path());
}
// NOTE: We cannot create the mount point at `target` if
// container has rootfs defined. The bind mount of the sandbox
// will hide what's inside `target`. So we should always create
// the mount point in `directory`.
string mountPoint = path::join(
containerConfig.directory(),
_volume.container_path());
Try<Nothing> mkdir = os::mkdir(mountPoint);
if (mkdir.isError()) {
return Failure(
"Failed to create the target of the mount at '" +
mountPoint + "': " + mkdir.error());
}
}
Mount mount;
mount.volume = _volume;
mount.csiVolume = volume;
mount.target = target;
mounts.push_back(mount);
volumeSet.insert(volume);
}
if (volumeSet.empty()) {
return None();
}
// Create the `CSIVolumes` protobuf message to checkpoint.
CSIVolumes state;
foreach (const CSIVolume& volume, volumeSet) {
state.add_volumes()->CopyFrom(volume);
}
const string volumesPath = csi::paths::getVolumesPath(rootDir, containerId);
Try<Nothing> checkpoint = state::checkpoint(volumesPath, state);
if (checkpoint.isError()) {
return Failure(
"Failed to checkpoint CSI volumes at '" +
volumesPath + "': " + checkpoint.error());
}
VLOG(1) << "Successfully created checkpoint at '" << volumesPath << "'";
infos.put(containerId, Owned<Info>(new Info(volumeSet)));
// Invoke CSI server to publish the volumes.
vector<Future<string>> futures;
futures.reserve(mounts.size());
foreach (const Mount& mount, mounts) {
futures.push_back(csiServer->publishVolume(mount.volume));
}
return await(futures)
.then(defer(
PID<VolumeCSIIsolatorProcess>(this),
&VolumeCSIIsolatorProcess::_prepare,
containerId,
mounts,
containerConfig.has_user()
? containerConfig.user()
: Option<string>::none(),
lambda::_1));
}
Future<Option<ContainerLaunchInfo>> VolumeCSIIsolatorProcess::_prepare(
const ContainerID& containerId,
const vector<Mount>& mounts,
const Option<string>& user,
const vector<Future<string>>& futures)
{
ContainerLaunchInfo launchInfo;
launchInfo.add_clone_namespaces(CLONE_NEWNS);
vector<string> messages;
vector<string> sources;
foreach (const Future<string>& future, futures) {
if (!future.isReady()) {
messages.push_back(future.isFailed() ? future.failure() : "discarded");
continue;
}
sources.push_back(strings::trim(future.get()));
}
if (!messages.empty()) {
return Failure(strings::join("\n", messages));
}
CHECK_EQ(sources.size(), mounts.size());
for (size_t i = 0; i < sources.size(); i++) {
const string& source = sources[i];
const Mount& mount = mounts[i];
if (user.isSome() && user.get() != "root") {
bool isVolumeInUse = false;
// Check if the volume is currently used by another container.
foreachpair (const ContainerID& _containerId,
const Owned<Info>& info,
infos) {
// Skip self.
if (_containerId == containerId) {
continue;
}
if (info->volumes.contains(mount.csiVolume)) {
isVolumeInUse = true;
break;
}
}
if (!isVolumeInUse) {
LOG(INFO) << "Changing the ownership of the CSI volume at '" << source
<< "' to user '" << user.get() << "' for container "
<< containerId;
Try<Nothing> chown = os::chown(user.get(), source, false);
if (chown.isError()) {
return Failure(
"Failed to set '" + user.get() + "' as the owner of the "
"CSI volume at '" + source + "': " + chown.error());
}
} else {
LOG(INFO) << "Leaving the ownership of the CSI volume at '"
<< source << "' unchanged because it is in use";
}
}
LOG(INFO) << "Mounting CSI volume mount point '" << source
<< "' to '" << mount.target << "' for container " << containerId;
*launchInfo.add_mounts() = protobuf::slave::createContainerMount(
source,
mount.target,
MS_BIND | MS_REC | (mount.volume.mode() == Volume::RO ? MS_RDONLY : 0));
}
return launchInfo;
}
Future<Nothing> VolumeCSIIsolatorProcess::cleanup(
const ContainerID& containerId)
{
if (!infos.contains(containerId)) {
VLOG(1) << "Ignoring cleanup request for unknown container " << containerId;
return Nothing();
}
hashmap<CSIVolume, int> references;
foreachvalue (const Owned<Info>& info, infos) {
foreach (const CSIVolume& volume, info->volumes) {
if (!references.contains(volume)) {
references[volume] = 1;
} else {
references[volume]++;
}
}
}
vector<Future<Nothing>> futures;
foreach (const CSIVolume& volume, infos[containerId]->volumes) {
if (references.contains(volume) && references[volume] > 1) {
VLOG(1) << "Cannot unpublish the volume with plugin '"
<< volume.plugin_name() << "' and ID '" << volume.id()
<< "' for container " << containerId
<< " since its reference count is " << references[volume];
continue;
}
LOG(INFO) << "Unpublishing the volume with plugin '"
<< volume.plugin_name() << "' and ID '" << volume.id()
<< "' for container " << containerId;
// Invoke CSI server to unpublish the volumes.
futures.push_back(
csiServer->unpublishVolume(volume.plugin_name(), volume.id()));
}
// Erase the `Info` struct of this container before unpublishing the volumes.
// This is to ensure the reference count of the volume will not be wrongly
// increased if unpublishing volumes fail, otherwise next time when another
// container using the same volume is destroyed, we would NOT unpublish the
// volume since its reference count would be larger than 1.
infos.erase(containerId);
return await(futures)
.then(defer(
PID<VolumeCSIIsolatorProcess>(this),
&VolumeCSIIsolatorProcess::_cleanup,
containerId,
lambda::_1));
}
Future<Nothing> VolumeCSIIsolatorProcess::_cleanup(
const ContainerID& containerId,
const vector<Future<Nothing>>& futures)
{
vector<string> messages;
foreach (const Future<Nothing>& future, futures) {
if (!future.isReady()) {
messages.push_back(future.isFailed() ? future.failure() : "discarded");
}
}
if (!messages.empty()) {
return Failure(strings::join("\n", messages));
}
const string containerDir = csi::paths::getContainerDir(rootDir, containerId);
Try<Nothing> rmdir = os::rmdir(containerDir);
if (rmdir.isError()) {
return Failure(
"Failed to remove the container directory at '" +
containerDir + "': " + rmdir.error());
}
LOG(INFO) << "Removed the container directory at '" << containerDir
<< "' for container " << containerId;
return Nothing();
}
} // namespace slave {
} // namespace internal {
} // namespace mesos {