blob: 3df62006047f837dca5edfc374b4a0e6bc36cc1f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <list>
#include <string>
#include <vector>
#include <mesos/mesos.hpp>
#include <mesos/type_utils.hpp>
#include <mesos/secret/resolver.hpp>
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/grpc.hpp>
#include <process/http.hpp>
#include <process/id.hpp>
#include <process/owned.hpp>
#include <process/sequence.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/json.hpp>
#include <stout/os.hpp>
#include <stout/protobuf.hpp>
#include <stout/try.hpp>
#include "common/validation.hpp"
#include "csi/metrics.hpp"
#include "csi/paths.hpp"
#include "csi/service_manager.hpp"
#include "csi/volume_manager.hpp"
#include "slave/csi_server.hpp"
#include "slave/flags.hpp"
#include "slave/paths.hpp"
using mesos::csi::ServiceManager;
using mesos::csi::VolumeManager;
using mesos::csi::state::VolumeState;
using process::Failure;
using process::Future;
using process::Owned;
using process::Promise;
using process::grpc::client::Runtime;
using process::http::authentication::Principal;
using std::list;
using std::string;
using std::vector;
namespace mesos {
namespace internal {
namespace slave {
constexpr char DEFAULT_CSI_CONTAINER_PREFIX[] = "mesos-internal-csi-";
static VolumeState createVolumeState(const Volume& volume);
static hashset<CSIPluginContainerInfo::Service> extractServices(
const CSIPluginInfo& plugin);
class CSIServerProcess : public process::Process<CSIServerProcess>
{
public:
CSIServerProcess(
const process::http::URL& _agentUrl,
const string& _rootDir,
const string& _pluginConfigDir,
SecretGenerator* _secretGenerator,
SecretResolver* _secretResolver)
: process::ProcessBase(process::ID::generate("csi-server")),
agentUrl(_agentUrl),
rootDir(_rootDir),
pluginConfigDir(_pluginConfigDir),
secretGenerator(_secretGenerator),
secretResolver(_secretResolver) {}
Future<Nothing> start(const SlaveID& _agentId);
Future<string> publishVolume(const Volume& volume);
Future<Nothing> unpublishVolume(
const string& pluginName,
const string& volumeId);
private:
struct CSIPlugin
{
CSIPlugin(
const CSIPluginInfo& _info,
const string& metricsPrefix)
: info(_info),
metrics(metricsPrefix) {}
CSIPluginInfo info;
Owned<ServiceManager> serviceManager;
Owned<VolumeManager> volumeManager;
Runtime runtime;
csi::Metrics metrics;
// CSI plugins are initialized lazily. When a publish/unpublish call is
// received for a plugin which is not yet initialized, this promise is used
// to perform the call after initialization is complete.
Promise<Nothing> initialized;
};
// Attempts to load configuration for a plugin with the specified name and
// then initializes the plugin. If no name is specified, then all
// configurations found in the plugin config directory are loaded.
Try<Nothing> initializePlugin(const Option<string>& name = None());
// Contains the plugins loaded by the server. The key of this map is the
// plugin name.
hashmap<string, CSIPlugin> plugins;
const process::http::URL agentUrl;
Option<SlaveID> agentId;
const string rootDir;
const string pluginConfigDir;
SecretGenerator* secretGenerator;
SecretResolver* secretResolver;
Option<string> authToken;
};
Try<Nothing> CSIServerProcess::initializePlugin(const Option<string>& name)
{
if (name.isSome()) {
CHECK(!plugins.contains(name.get()));
}
Try<list<string>> entries = os::ls(pluginConfigDir);
if (entries.isError()) {
return Error(
"Unable to list the CSI plugin configuration directory '" +
pluginConfigDir + "': " + entries.error());
}
// We are either looking for one specific plugin (if `name` is SOME), or we
// are loading all configs we find (if `name` is NONE). First, we populate
// `pluginConfigs` with one or more valid configurations. Then, we will
// initialize the plugin(s) based on the configuration(s) found.
hashmap<string, CSIPluginInfo> pluginConfigs;
foreach (const string& entry, entries.get()) {
const string path = path::join(pluginConfigDir, entry);
// Ignore directory entries.
if (os::stat::isdir(path)) {
continue;
}
Try<string> read = os::read(path);
if (read.isError()) {
// In case of an error we log and skip to the next entry.
LOG(ERROR) << "Failed to read CSI plugin configuration file '"
<< path << "': " << read.error();
continue;
}
Try<JSON::Object> json = JSON::parse<JSON::Object>(read.get());
if (json.isError()) {
return Error("JSON parse of '" + path + "' failed: " + json.error());
}
Try<CSIPluginInfo> parse = ::protobuf::parse<CSIPluginInfo>(json.get());
if (parse.isError()) {
return Error("Protobuf parse of '" + path + "' failed: " + parse.error());
}
const CSIPluginInfo& csiPluginConfig = parse.get();
const string& type = csiPluginConfig.type();
if (pluginConfigs.contains(type)) {
LOG(ERROR) << "Multiple configurations for a CSI plugin are not allowed. "
<< "Skipping configuration file '" << path << "' since CSI "
<< "plugin '" << type << "' already exists";
continue;
}
if (name.isNone() || name.get() == type) {
pluginConfigs[type] = csiPluginConfig;
if (name.isSome()) {
break;
}
}
}
if (pluginConfigs.empty()) {
return Error(
"No valid CSI plugin configurations found in '" +
pluginConfigDir + "'");
}
foreachpair (const string& _name, const CSIPluginInfo& info, pluginConfigs) {
// Default-construct the plugin struct so that we have a valid runtime
// to pass into the service manager.
plugins.put(_name, CSIPlugin(info, "csi_plugins/" + _name + "/"));
CSIPlugin& plugin = plugins.at(_name);
if (info.containers_size() > 0) {
CHECK_SOME(agentId);
plugin.serviceManager.reset(new ServiceManager(
agentId.get(),
agentUrl,
rootDir,
info,
extractServices(info),
DEFAULT_CSI_CONTAINER_PREFIX,
authToken,
plugin.runtime,
&plugin.metrics));
} else {
CHECK(info.endpoints_size() > 0);
plugin.serviceManager.reset(new ServiceManager(
info,
extractServices(info),
plugin.runtime,
&plugin.metrics));
}
plugin.initialized.associate(
plugin.serviceManager->recover()
.then(defer(self(), [=]() {
CHECK(plugins.contains(_name));
return plugins.at(_name).serviceManager->getApiVersion();
}))
.then(defer(self(), [=](const string& apiVersion) -> Future<Nothing> {
CHECK(plugins.contains(_name));
Try<Owned<VolumeManager>> volumeManager = VolumeManager::create(
rootDir,
info,
extractServices(info),
apiVersion,
plugins.at(_name).runtime,
plugins.at(_name).serviceManager.get(),
&plugins.at(_name).metrics,
secretResolver);
if (volumeManager.isError()) {
return Failure(
"CSI server failed to create volume manager for plugin"
" '" + info.name() + "': " + volumeManager.error());
}
plugins.at(_name).volumeManager = std::move(volumeManager.get());
return plugins.at(_name).volumeManager->recover();
}))
.onAny(defer(self(), [=](const Future<Nothing>& future) {
if (!future.isReady()) {
plugins.erase(_name);
LOG(ERROR)
<< "CSI server failed to initialize plugin '" << _name << "': "
<< (future.isFailed() ? future.failure() : "discarded");
} else {
LOG(INFO)
<< "CSI server successfully initialized plugin '"
<< _name << "'";
}
})));
}
return Nothing();
}
Future<Nothing> CSIServerProcess::start(const SlaveID& _agentId)
{
// NOTE: It's possible that the agent receives multiple
// `SlaveRegisteredMessage`s and detects a disconnection in between.
// In that case, `start` will be called multiple times from
// `Slave::registered`.
if (agentId.isSome()) {
CHECK_EQ(agentId.get(), _agentId)
<< "Cannot start CSI server with agent ID " << _agentId
<< " (expected: " << agentId.get() << ")";
return Nothing();
}
agentId = _agentId;
Future<Nothing> result = Nothing();
if (secretGenerator) {
// The contents of this principal are arbitrary. We choose to avoid a
// principal with a 'value' string so that we do not unintentionally collide
// with another real principal with restricted permissions.
Principal principal(
Option<string>::none(),
{{"cid_prefix", DEFAULT_CSI_CONTAINER_PREFIX}});
result = secretGenerator->generate(principal)
.then(defer(self(), [=](const Secret& secret) -> Future<Nothing> {
Option<Error> error = common::validation::validateSecret(secret);
if (error.isSome()) {
return Failure(
"CSI server failed to validate generated secret: " +
error->message);
}
if (secret.type() != Secret::VALUE) {
return Failure(
"CSI server expecting generated secret to be of VALUE type "
"instead of " + stringify(secret.type()) + " type; " +
"only VALUE type secrets are supported at this time");
}
CHECK(secret.has_value());
authToken = secret.value().data();
return Nothing();
}));
}
return result
.then(defer(self(), [=]() -> Future<Nothing> {
// Load all CSI plugin configurations found.
// NOTE: `initializePlugin()` requires that the `authToken` has already
// been set, so the order of these continuations matters.
Try<Nothing> init = initializePlugin();
if (init.isError()) {
return Failure(
"CSI server failed to initialize CSI plugins: " + init.error());
}
return Nothing();
}));
}
Future<string> CSIServerProcess::publishVolume(const Volume& volume)
{
CHECK(volume.has_source() &&
volume.source().has_type() &&
volume.source().type() == Volume::Source::CSI_VOLUME);
CHECK(volume.source().has_csi_volume() &&
volume.source().csi_volume().has_static_provisioning());
const Volume::Source::CSIVolume& csiVolume = volume.source().csi_volume();
const string& name = csiVolume.plugin_name();
if (!plugins.contains(name)) {
// This will attempt to load the plugin's configuration, initialize the
// plugin, and insert it into the `plugins` map.
Try<Nothing> pluginInit = initializePlugin(name);
if (pluginInit.isError()) {
return Failure(
"Failed to initialize CSI plugin '" +
name + "': " + pluginInit.error());
}
}
CHECK(plugins.contains(name));
return plugins.at(name).initialized.future()
.then(defer(self(), [=]() {
CHECK(plugins.contains(name));
return plugins.at(name).volumeManager->publishVolume(
csiVolume.static_provisioning().volume_id(),
createVolumeState(volume));
}))
.then(defer(self(), [=]() {
CHECK(plugins.contains(name));
const CSIPluginInfo& info = plugins.at(csiVolume.plugin_name()).info;
const string mountRootDir = info.has_target_path_root()
? info.target_path_root()
: csi::paths::getMountRootDir(rootDir, info.type(), info.name());
return csi::paths::getMountTargetPath(
mountRootDir,
csiVolume.static_provisioning().volume_id());
}));
}
Future<Nothing> CSIServerProcess::unpublishVolume(
const string& pluginName,
const string& volumeId)
{
if (!plugins.contains(pluginName)) {
// This will attempt to load the plugin's configuration, initialize the
// plugin, and insert it into the `plugins` map.
Try<Nothing> pluginInit = initializePlugin(pluginName);
if (pluginInit.isError()) {
return Failure(
"Failed to initialize CSI plugin '" +
pluginName + "': " + pluginInit.error());
}
}
CHECK(plugins.contains(pluginName));
return plugins.at(pluginName).initialized.future()
.then(defer(self(), [=]() {
return plugins.at(pluginName).volumeManager->unpublishVolume(volumeId);
}));
}
VolumeState createVolumeState(const Volume& volume)
{
const Volume::Source::CSIVolume::StaticProvisioning& staticProvisioning =
volume.source().csi_volume().static_provisioning();
VolumeState result;
result.set_state(VolumeState::NODE_READY);
*result.mutable_volume_capability() = staticProvisioning.volume_capability();
*result.mutable_volume_context() = staticProvisioning.volume_context();
result.set_readonly(volume.mode() == Volume::RO);
result.set_pre_provisioned(true);
return result;
}
hashset<CSIPluginContainerInfo::Service> extractServices(
const CSIPluginInfo& plugin)
{
hashset<CSIPluginContainerInfo::Service> result;
if (plugin.containers_size() > 0) {
foreach (const CSIPluginContainerInfo& container, plugin.containers()) {
for (int i = 0; i < container.services_size(); ++i) {
result.insert(container.services(i));
}
}
} else {
CHECK(plugin.endpoints_size() > 0);
foreach (const CSIPluginEndpoint& endpoint, plugin.endpoints()) {
result.insert(endpoint.csi_service());
}
}
return result;
}
CSIServer::CSIServer(
const process::http::URL& agentUrl,
const string& rootDir,
const string& pluginConfigDir,
SecretGenerator* secretGenerator,
SecretResolver* secretResolver)
: process(new CSIServerProcess(
agentUrl,
rootDir,
pluginConfigDir,
secretGenerator,
secretResolver))
{
process::spawn(CHECK_NOTNULL(process.get()));
}
CSIServer::~CSIServer()
{
process::terminate(process.get());
process::wait(process.get());
}
Try<Owned<CSIServer>> CSIServer::create(
const Flags& flags,
const process::http::URL& agentUrl,
SecretGenerator* secretGenerator,
SecretResolver* secretResolver)
{
if (!strings::contains(flags.isolation, "volume/csi")) {
return Error("Missing required isolator 'volume/csi'");
}
if (flags.csi_plugin_config_dir.isNone() ||
flags.csi_plugin_config_dir->empty()) {
return Error("Missing required '--csi_plugin_config_dir' flag");
}
if (!os::exists(flags.csi_plugin_config_dir.get())) {
return Error(
"The CSI plugin configuration directory '" +
flags.csi_plugin_config_dir.get() + "' does not exist");
}
return new CSIServer(
agentUrl,
slave::paths::getCsiRootDir(flags.work_dir),
flags.csi_plugin_config_dir.get(),
secretGenerator,
secretResolver);
}
Future<Nothing> CSIServer::start(const SlaveID& agentId)
{
started.associate(process::dispatch(
process.get(),
&CSIServerProcess::start,
agentId));
return started.future();
}
Future<string> CSIServer::publishVolume(const Volume& volume)
{
return started.future()
.then(process::defer(
process.get(),
&CSIServerProcess::publishVolume,
volume));
}
Future<Nothing> CSIServer::unpublishVolume(
const string& pluginName,
const string& volumeId)
{
return started.future()
.then(process::defer(
process.get(),
&CSIServerProcess::unpublishVolume,
pluginName,
volumeId));
}
} // namespace slave {
} // namespace internal {
} // namespace mesos {