blob: 93301f8af9b494bbcecb10e6cd4f4073b6a05070 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <csi/spec.hpp>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#include <grpcpp/security/server_credentials.h>
#include <stout/bytes.hpp>
#include <stout/flags.hpp>
#include <stout/hashmap.hpp>
#include <stout/option.hpp>
#include <stout/path.hpp>
#include <stout/stringify.hpp>
#include <stout/strings.hpp>
#include <stout/os/exists.hpp>
#include <stout/os/ls.hpp>
#include <stout/os/mkdir.hpp>
#include <stout/os/rmdir.hpp>
#include "csi/utils.hpp"
#include "linux/fs.hpp"
#include "logging/logging.hpp"
namespace fs = mesos::internal::fs;
using std::cerr;
using std::cout;
using std::endl;
using std::list;
using std::max;
using std::min;
using std::string;
using std::unique_ptr;
using std::vector;
using grpc::InsecureServerCredentials;
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
constexpr char PLUGIN_NAME[] = "org.apache.mesos.csi.test";
constexpr char NODE_ID[] = "localhost";
constexpr Bytes DEFAULT_VOLUME_CAPACITY = Megabytes(64);
class Flags : public virtual mesos::internal::logging::Flags
{
public:
Flags()
{
add(&Flags::endpoint,
"endpoint",
"Path to the Unix domain socket the plugin should bind to.");
add(&Flags::work_dir,
"work_dir",
"Path to the work directory of the plugin.");
add(&Flags::available_capacity,
"available_capacity",
"The available disk capacity managed by the plugin, in addition\n"
"to the pre-existing volumes specified in the --volumes flag.");
add(&Flags::volumes,
"volumes",
"Creates pre-existing volumes upon start-up. The volumes are\n"
"specified as a semicolon-delimited list of name:capacity pairs.\n"
"If a volume with the same name already exists, the pair will be\n"
"ignored. (Example: 'volume1:1GB;volume2:2GB')");
}
string endpoint;
string work_dir;
Bytes available_capacity;
Option<string> volumes;
};
class TestCSIPlugin
: public csi::v0::Identity::Service,
public csi::v0::Controller::Service,
public csi::v0::Node::Service
{
public:
TestCSIPlugin(
const string& _workDir,
const string& _endpoint,
const Bytes& _availableCapacity,
const hashmap<string, Bytes>& _volumes)
: workDir(_workDir),
endpoint(_endpoint),
availableCapacity(_availableCapacity)
{
// TODO(jieyu): Consider not using CHECKs here.
Try<list<string>> paths = os::ls(workDir);
CHECK_SOME(paths);
foreach (const string& path, paths.get()) {
Try<VolumeInfo> volumeInfo = parseVolumePath(path);
CHECK_SOME(volumeInfo);
CHECK(!volumes.contains(volumeInfo->id));
volumes.put(volumeInfo->id, volumeInfo.get());
if (!_volumes.contains(volumeInfo->id)) {
CHECK_GE(availableCapacity, volumeInfo->size);
availableCapacity -= volumeInfo->size;
}
}
foreachpair (const string& name, const Bytes& capacity, _volumes) {
if (volumes.contains(name)) {
continue;
}
VolumeInfo volumeInfo;
volumeInfo.id = name;
volumeInfo.size = capacity;
const string path = getVolumePath(volumeInfo);
Try<Nothing> mkdir = os::mkdir(path);
CHECK_SOME(mkdir);
volumes.put(volumeInfo.id, volumeInfo);
}
ServerBuilder builder;
builder.AddListeningPort(endpoint, InsecureServerCredentials());
builder.RegisterService(static_cast<csi::v0::Identity::Service*>(this));
builder.RegisterService(static_cast<csi::v0::Controller::Service*>(this));
builder.RegisterService(static_cast<csi::v0::Node::Service*>(this));
server = builder.BuildAndStart();
}
void wait()
{
if (server) {
server->Wait();
}
}
Status GetPluginInfo(
ServerContext* context,
const csi::v0::GetPluginInfoRequest* request,
csi::v0::GetPluginInfoResponse* response) override;
Status GetPluginCapabilities(
ServerContext* context,
const csi::v0::GetPluginCapabilitiesRequest* request,
csi::v0::GetPluginCapabilitiesResponse* response) override;
Status Probe(
ServerContext* context,
const csi::v0::ProbeRequest* request,
csi::v0::ProbeResponse* response) override;
Status CreateVolume(
ServerContext* context,
const csi::v0::CreateVolumeRequest* request,
csi::v0::CreateVolumeResponse* response) override;
Status DeleteVolume(
ServerContext* context,
const csi::v0::DeleteVolumeRequest* request,
csi::v0::DeleteVolumeResponse* response) override;
Status ControllerPublishVolume(
ServerContext* context,
const csi::v0::ControllerPublishVolumeRequest* request,
csi::v0::ControllerPublishVolumeResponse* response) override;
Status ControllerUnpublishVolume(
ServerContext* context,
const csi::v0::ControllerUnpublishVolumeRequest* request,
csi::v0::ControllerUnpublishVolumeResponse* response) override;
Status ValidateVolumeCapabilities(
ServerContext* context,
const csi::v0::ValidateVolumeCapabilitiesRequest* request,
csi::v0::ValidateVolumeCapabilitiesResponse* response) override;
Status ListVolumes(
ServerContext* context,
const csi::v0::ListVolumesRequest* request,
csi::v0::ListVolumesResponse* response) override;
Status GetCapacity(
ServerContext* context,
const csi::v0::GetCapacityRequest* request,
csi::v0::GetCapacityResponse* response) override;
Status ControllerGetCapabilities(
ServerContext* context,
const csi::v0::ControllerGetCapabilitiesRequest* request,
csi::v0::ControllerGetCapabilitiesResponse* response) override;
Status NodeStageVolume(
ServerContext* context,
const csi::v0::NodeStageVolumeRequest* request,
csi::v0::NodeStageVolumeResponse* response) override;
Status NodeUnstageVolume(
ServerContext* context,
const csi::v0::NodeUnstageVolumeRequest* request,
csi::v0::NodeUnstageVolumeResponse* response) override;
Status NodePublishVolume(
ServerContext* context,
const csi::v0::NodePublishVolumeRequest* request,
csi::v0::NodePublishVolumeResponse* response) override;
Status NodeUnpublishVolume(
ServerContext* context,
const csi::v0::NodeUnpublishVolumeRequest* request,
csi::v0::NodeUnpublishVolumeResponse* response) override;
Status NodeGetId(
ServerContext* context,
const csi::v0::NodeGetIdRequest* request,
csi::v0::NodeGetIdResponse* response) override;
Status NodeGetCapabilities(
ServerContext* context,
const csi::v0::NodeGetCapabilitiesRequest* request,
csi::v0::NodeGetCapabilitiesResponse* response) override;
private:
struct VolumeInfo
{
string id;
Bytes size;
};
string getVolumePath(const VolumeInfo& volumeInfo);
Try<VolumeInfo> parseVolumePath(const string& path);
const string workDir;
const string endpoint;
Bytes availableCapacity;
hashmap<string, VolumeInfo> volumes;
unique_ptr<Server> server;
};
Status TestCSIPlugin::GetPluginInfo(
ServerContext* context,
const csi::v0::GetPluginInfoRequest* request,
csi::v0::GetPluginInfoResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
response->set_name(PLUGIN_NAME);
response->set_vendor_version(MESOS_VERSION);
return Status::OK;
}
Status TestCSIPlugin::GetPluginCapabilities(
ServerContext* context,
const csi::v0::GetPluginCapabilitiesRequest* request,
csi::v0::GetPluginCapabilitiesResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
response->add_capabilities()->mutable_service()->set_type(
csi::v0::PluginCapability::Service::CONTROLLER_SERVICE);
return Status::OK;
}
Status TestCSIPlugin::Probe(
ServerContext* context,
const csi::v0::ProbeRequest* request,
csi::v0::ProbeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
return Status::OK;
}
Status TestCSIPlugin::CreateVolume(
ServerContext* context,
const csi::v0::CreateVolumeRequest* request,
csi::v0::CreateVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate required fields.
if (request->name().empty()) {
return Status(grpc::INVALID_ARGUMENT, "Volume name cannot be empty");
}
if (request->name().find_first_of(os::PATH_SEPARATOR) != string::npos) {
return Status(grpc::INVALID_ARGUMENT, "Volume name cannot contain '/'");
}
bool alreadyExists = volumes.contains(request->name());
if (!alreadyExists) {
if (availableCapacity == Bytes(0)) {
return Status(grpc::OUT_OF_RANGE, "Insufficient capacity");
}
VolumeInfo volumeInfo;
volumeInfo.id = request->name();
volumeInfo.size = min(DEFAULT_VOLUME_CAPACITY, availableCapacity);
if (request->has_capacity_range()) {
const csi::v0::CapacityRange& range = request->capacity_range();
// The highest we can pick.
Bytes limit = availableCapacity;
if (range.limit_bytes() != 0) {
limit = min(availableCapacity, Bytes(range.limit_bytes()));
}
if (range.required_bytes() != 0 &&
static_cast<size_t>(range.required_bytes()) > limit.bytes()) {
return Status(grpc::OUT_OF_RANGE, "Cannot satisfy 'required_bytes'");
}
volumeInfo.size = min(
limit,
max(DEFAULT_VOLUME_CAPACITY, Bytes(range.required_bytes())));
}
const string path = getVolumePath(volumeInfo);
Try<Nothing> mkdir = os::mkdir(path);
if (mkdir.isError()) {
return Status(
grpc::INTERNAL,
"Failed to create volume '" + volumeInfo.id + "': " + mkdir.error());
}
CHECK_GE(availableCapacity, volumeInfo.size);
availableCapacity -= volumeInfo.size;
volumes.put(volumeInfo.id, volumeInfo);
}
const VolumeInfo& volumeInfo = volumes.at(request->name());
response->mutable_volume()->set_id(volumeInfo.id);
response->mutable_volume()->set_capacity_bytes(volumeInfo.size.bytes());
(*response->mutable_volume()->mutable_attributes())["path"] =
getVolumePath(volumeInfo);
if (alreadyExists) {
return Status(
grpc::ALREADY_EXISTS,
"Volume with name '" + request->name() + "' already exists");
}
return Status::OK;
}
Status TestCSIPlugin::DeleteVolume(
ServerContext* context,
const csi::v0::DeleteVolumeRequest* request,
csi::v0::DeleteVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate required fields.
if (!volumes.contains(request->volume_id())) {
return Status::OK;
}
const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
const string path = getVolumePath(volumeInfo);
Try<Nothing> rmdir = os::rmdir(path);
if (rmdir.isError()) {
return Status(
grpc::INTERNAL,
"Failed to delete volume '" + request->volume_id() + "': " +
rmdir.error());
}
availableCapacity += volumeInfo.size;
volumes.erase(volumeInfo.id);
return Status::OK;
}
Status TestCSIPlugin::ControllerPublishVolume(
ServerContext* context,
const csi::v0::ControllerPublishVolumeRequest* request,
csi::v0::ControllerPublishVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate required fields.
if (!volumes.contains(request->volume_id())) {
return Status(
grpc::NOT_FOUND,
"Volume '" + request->volume_id() + "' is not found");
}
const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
const string path = getVolumePath(volumeInfo);
auto it = request->volume_attributes().find("path");
if (it == request->volume_attributes().end() || it->second != path) {
return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes");
}
if (request->node_id() != NODE_ID) {
return Status(
grpc::NOT_FOUND,
"Node '" + request->node_id() + "' is not found");
}
// Do nothing.
return Status::OK;
}
Status TestCSIPlugin::ControllerUnpublishVolume(
ServerContext* context,
const csi::v0::ControllerUnpublishVolumeRequest* request,
csi::v0::ControllerUnpublishVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate required fields.
if (!volumes.contains(request->volume_id())) {
return Status(
grpc::NOT_FOUND,
"Volume '" + request->volume_id() + "' is not found");
}
if (request->node_id() != NODE_ID) {
return Status(
grpc::NOT_FOUND,
"Node '" + request->node_id() + "' is not found");
}
// Do nothing.
return Status::OK;
}
Status TestCSIPlugin::ValidateVolumeCapabilities(
ServerContext* context,
const csi::v0::ValidateVolumeCapabilitiesRequest* request,
csi::v0::ValidateVolumeCapabilitiesResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate required fields.
if (!volumes.contains(request->volume_id())) {
return Status(
grpc::NOT_FOUND,
"Volume '" + request->volume_id() + "' is not found");
}
const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
const string path = getVolumePath(volumeInfo);
auto it = request->volume_attributes().find("path");
if (it == request->volume_attributes().end() || it->second != path) {
return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes");
}
foreach (const csi::v0::VolumeCapability& capability,
request->volume_capabilities()) {
if (capability.has_mount() &&
(!capability.mount().fs_type().empty() ||
!capability.mount().mount_flags().empty())) {
response->set_supported(false);
response->set_message("Only default capability is supported");
return Status::OK;
}
if (capability.access_mode().mode() !=
csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER) {
response->set_supported(false);
response->set_message("Access mode is not supported");
return Status::OK;
}
}
response->set_supported(true);
return Status::OK;
}
Status TestCSIPlugin::ListVolumes(
ServerContext* context,
const csi::v0::ListVolumesRequest* request,
csi::v0::ListVolumesResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Support the `max_entries` field.
if (request->max_entries() > 0) {
return Status(grpc::ABORTED, "Field 'max_entries' is not supported");
}
// TODO(chhsiao): Support the `starting_token` fields.
if (!request->starting_token().empty()) {
return Status(grpc::ABORTED, "Field 'starting_token' is not supported");
}
foreachvalue (const VolumeInfo& volumeInfo, volumes) {
csi::v0::Volume* volume = response->add_entries()->mutable_volume();
volume->set_id(volumeInfo.id);
volume->set_capacity_bytes(volumeInfo.size.bytes());
(*volume->mutable_attributes())["path"] = getVolumePath(volumeInfo);
}
return Status::OK;
}
Status TestCSIPlugin::GetCapacity(
ServerContext* context,
const csi::v0::GetCapacityRequest* request,
csi::v0::GetCapacityResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
foreach (const csi::v0::VolumeCapability& capability,
request->volume_capabilities()) {
// We report zero capacity for any capability other than the
// default-constructed `MountVolume` capability since this plugin
// does not support any filesystem types and mount flags.
if (!capability.has_mount() ||
!capability.mount().fs_type().empty() ||
!capability.mount().mount_flags().empty()) {
response->set_available_capacity(0);
return Status::OK;
}
if (capability.access_mode().mode() !=
csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER) {
response->set_available_capacity(0);
return Status::OK;
}
}
response->set_available_capacity(availableCapacity.bytes());
return Status::OK;
}
Status TestCSIPlugin::ControllerGetCapabilities(
ServerContext* context,
const csi::v0::ControllerGetCapabilitiesRequest* request,
csi::v0::ControllerGetCapabilitiesResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
response->add_capabilities()->mutable_rpc()->set_type(
csi::v0::ControllerServiceCapability::RPC::CREATE_DELETE_VOLUME);
response->add_capabilities()->mutable_rpc()->set_type(
csi::v0::ControllerServiceCapability::RPC::PUBLISH_UNPUBLISH_VOLUME);
response->add_capabilities()->mutable_rpc()->set_type(
csi::v0::ControllerServiceCapability::RPC::GET_CAPACITY);
response->add_capabilities()->mutable_rpc()->set_type(
csi::v0::ControllerServiceCapability::RPC::LIST_VOLUMES);
return Status::OK;
}
Status TestCSIPlugin::NodeStageVolume(
ServerContext* context,
const csi::v0::NodeStageVolumeRequest* request,
csi::v0::NodeStageVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate required fields.
if (!volumes.contains(request->volume_id())) {
return Status(
grpc::NOT_FOUND,
"Volume '" + request->volume_id() + "' is not found");
}
const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
const string path = getVolumePath(volumeInfo);
auto it = request->volume_attributes().find("path");
if (it == request->volume_attributes().end() || it->second != path) {
return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes");
}
if (!os::exists(request->staging_target_path())) {
return Status(
grpc::INVALID_ARGUMENT,
"Target path '" + request->staging_target_path() + "' is not found");
}
Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
if (table.isError()) {
return Status(
grpc::INTERNAL,
"Failed to get mount table: " + table.error());
}
foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
if (entry.target == request->staging_target_path()) {
return Status::OK;
}
}
Try<Nothing> mount = fs::mount(
path,
request->staging_target_path(),
None(),
MS_BIND,
None());
if (mount.isError()) {
return Status(
grpc::INTERNAL,
"Failed to mount from '" + path + "' to '" +
request->staging_target_path() + "': " + mount.error());
}
return Status::OK;
}
Status TestCSIPlugin::NodeUnstageVolume(
ServerContext* context,
const csi::v0::NodeUnstageVolumeRequest* request,
csi::v0::NodeUnstageVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate required fields.
if (!volumes.contains(request->volume_id())) {
return Status(
grpc::NOT_FOUND,
"Volume '" + request->volume_id() + "' is not found");
}
Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
if (table.isError()) {
return Status(
grpc::INTERNAL,
"Failed to get mount table: " + table.error());
}
bool found = false;
foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
if (entry.target == request->staging_target_path()) {
found = true;
break;
}
}
if (!found) {
return Status::OK;
}
Try<Nothing> unmount = fs::unmount(request->staging_target_path());
if (unmount.isError()) {
return Status(
grpc::INTERNAL,
"Failed to unmount '" + request->staging_target_path() +
"': " + unmount.error());
}
return Status::OK;
}
Status TestCSIPlugin::NodePublishVolume(
ServerContext* context,
const csi::v0::NodePublishVolumeRequest* request,
csi::v0::NodePublishVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate required fields.
if (!volumes.contains(request->volume_id())) {
return Status(
grpc::NOT_FOUND,
"Volume '" + request->volume_id() + "' is not found");
}
const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
const string path = getVolumePath(volumeInfo);
auto it = request->volume_attributes().find("path");
if (it == request->volume_attributes().end() || it->second != path) {
return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes");
}
if (!os::exists(request->target_path())) {
return Status(
grpc::INVALID_ARGUMENT,
"Target path '" + request->target_path() + "' is not found");
}
if (request->staging_target_path().empty()) {
return Status(
grpc::FAILED_PRECONDITION,
"Expecting 'staging_target_path' to be set");
}
Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
if (table.isError()) {
return Status(
grpc::INTERNAL,
"Failed to get mount table: " + table.error());
}
bool found = false;
foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
if (entry.target == request->staging_target_path()) {
found = true;
break;
}
}
if (!found) {
return Status(
grpc::FAILED_PRECONDITION,
"Volume '" + request->volume_id() + "' has not been staged yet");
}
foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
if (entry.target == request->target_path()) {
return Status::OK;
}
}
Try<Nothing> mount = fs::mount(
request->staging_target_path(),
request->target_path(),
None(),
MS_BIND,
None());
if (mount.isError()) {
return Status(
grpc::INTERNAL,
"Failed to mount from '" + path + "' to '" +
request->target_path() + "': " + mount.error());
}
if (request->readonly()) {
mount = fs::mount(
None(),
request->target_path(),
None(),
MS_BIND | MS_RDONLY | MS_REMOUNT,
None());
return Status(
grpc::INTERNAL,
"Failed to mount '" + request->target_path() +
"' as read only: " + mount.error());
}
return Status::OK;
}
Status TestCSIPlugin::NodeUnpublishVolume(
ServerContext* context,
const csi::v0::NodeUnpublishVolumeRequest* request,
csi::v0::NodeUnpublishVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
if (!volumes.contains(request->volume_id())) {
return Status(
grpc::NOT_FOUND,
"Volume '" + request->volume_id() + "' is not found");
}
Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
if (table.isError()) {
return Status(
grpc::INTERNAL,
"Failed to get mount table: " + table.error());
}
bool found = false;
foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
if (entry.target == request->target_path()) {
found = true;
break;
}
}
if (!found) {
return Status::OK;
}
Try<Nothing> unmount = fs::unmount(request->target_path());
if (unmount.isError()) {
return Status(
grpc::INTERNAL,
"Failed to unmount '" + request->target_path() +
"': " + unmount.error());
}
return Status::OK;
}
Status TestCSIPlugin::NodeGetId(
ServerContext* context,
const csi::v0::NodeGetIdRequest* request,
csi::v0::NodeGetIdResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
response->set_node_id(NODE_ID);
return Status::OK;
}
Status TestCSIPlugin::NodeGetCapabilities(
ServerContext* context,
const csi::v0::NodeGetCapabilitiesRequest* request,
csi::v0::NodeGetCapabilitiesResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
response->add_capabilities()->mutable_rpc()->set_type(
csi::v0::NodeServiceCapability::RPC::STAGE_UNSTAGE_VOLUME);
return Status::OK;
}
string TestCSIPlugin::getVolumePath(const VolumeInfo& volumeInfo)
{
return path::join(
workDir,
strings::join("-", stringify(volumeInfo.size), volumeInfo.id));
}
Try<TestCSIPlugin::VolumeInfo> TestCSIPlugin::parseVolumePath(
const string& path)
{
size_t pos = path.find_first_of("-");
if (pos == string::npos) {
return Error("Cannot find the delimiter");
}
string bytesString = path.substr(0, path.find_first_of("-"));
string id = path.substr(path.find_first_of("-") + 1);
Try<Bytes> bytes = Bytes::parse(bytesString);
if (bytes.isError()) {
return Error("Failed to parse bytes: " + bytes.error());
}
VolumeInfo volumeInfo;
volumeInfo.id = id;
volumeInfo.size = bytes.get();
return volumeInfo;
}
int main(int argc, char** argv)
{
Flags flags;
Try<flags::Warnings> load = flags.load("CSI_", argc, argv);
if (flags.help) {
cout << flags.usage() << endl;
return EXIT_SUCCESS;
}
if (load.isError()) {
cerr << flags.usage(load.error()) << endl;
return EXIT_FAILURE;
}
mesos::internal::logging::initialize(argv[0], true, flags);
// Log any flag warnings.
foreach (const flags::Warning& warning, load->warnings) {
LOG(WARNING) << warning.message;
}
Try<Nothing> mkdir = os::mkdir(flags.work_dir);
if (mkdir.isError()) {
cerr << "Failed to create working directory '" << flags.work_dir
<< "': " << mkdir.error() << endl;
return EXIT_FAILURE;
}
hashmap<string, Bytes> volumes;
if (flags.volumes.isSome()) {
foreach (const string& token, strings::tokenize(flags.volumes.get(), ";")) {
vector<string> pair = strings::tokenize(token, ":");
Option<Error> error;
if (pair.size() != 2) {
error = "Not a name:capacity pair";
} else if (pair[0].empty()) {
error = "Volume name cannot be empty";
} else if (pair[0].find_first_of(os::PATH_SEPARATOR) != string::npos) {
error = "Volume name cannot contain '/'";
} else if (volumes.contains(pair[0])) {
error = "Volume name must be unique";
} else {
Try<Bytes> capacity = Bytes::parse(pair[1]);
if (capacity.isError()) {
error = capacity.error();
} else {
volumes.put(pair[0], capacity.get());
}
}
if (error.isSome()) {
cerr << "Failed to parse item '" << token << "' in 'volumes' flag: "
<< error->message;
return EXIT_FAILURE;
}
}
}
unique_ptr<TestCSIPlugin> plugin(new TestCSIPlugin(
flags.work_dir,
flags.endpoint,
flags.available_capacity,
volumes));
plugin->wait();
return EXIT_SUCCESS;
}