blob: b54d666a878576f5845027fe6f704e10195079e1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <algorithm>
#include <limits>
#include <memory>
#include <thread>
#include <utility>
#include <vector>
#include <google/protobuf/map.h>
#include <google/protobuf/message.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/generic/async_generic_service.h>
#include <grpcpp/generic/generic_stub.h>
#include <mesos/type_utils.hpp>
#include <mesos/csi/types.hpp>
#include <mesos/csi/v0.hpp>
#include <mesos/csi/v1.hpp>
#include <process/grpc.hpp>
#include <process/http.hpp>
#include <stout/adaptor.hpp>
#include <stout/bytes.hpp>
#include <stout/flags.hpp>
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
#include <stout/none.hpp>
#include <stout/option.hpp>
#include <stout/path.hpp>
#include <stout/some.hpp>
#include <stout/stringify.hpp>
#include <stout/strings.hpp>
#include <stout/try.hpp>
#include <stout/unreachable.hpp>
#include <stout/os/exists.hpp>
#include <stout/os/ls.hpp>
#include <stout/os/mkdir.hpp>
#include <stout/os/rmdir.hpp>
#include "csi/v0_utils.hpp"
#include "csi/v1_utils.hpp"
#include "linux/fs.hpp"
#include "logging/logging.hpp"
namespace http = process::http;
namespace fs = mesos::internal::fs;
using std::cerr;
using std::cout;
using std::endl;
using std::list;
using std::max;
using std::min;
using std::string;
using std::unique_ptr;
using std::vector;
using google::protobuf::Map;
using google::protobuf::MapPair;
using google::protobuf::RepeatedPtrField;
using grpc::AsyncGenericService;
using grpc::ByteBuffer;
using grpc::ClientContext;
using grpc::GenericClientAsyncResponseReader;
using grpc::GenericServerAsyncReaderWriter;
using grpc::GenericServerContext;
using grpc::GenericStub;
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::Status;
using grpc::WriteOptions;
using mesos::csi::types::VolumeCapability;
using process::grpc::StatusError;
constexpr char PLUGIN_NAME[] = "org.apache.mesos.csi.test";
constexpr char NODE_ID[] = "localhost";
constexpr Bytes DEFAULT_VOLUME_CAPACITY = Megabytes(64);
class Flags : public virtual mesos::internal::logging::Flags
{
public:
Flags()
{
const hashset<string> supportedApiVersions =
{mesos::csi::v0::API_VERSION, mesos::csi::v1::API_VERSION};
add(&Flags::api_version,
"api_version",
"If set, the plugin would only serve CSI calls of the specified API\n"
"version. Otherwise, the plugin serves all supported API versions by\n"
"default. (Supported values: " + stringify(supportedApiVersions) + ")",
[=](const Option<string>& apiVersion) {
return apiVersion.isNone() ||
supportedApiVersions.contains(apiVersion.get())
? Option<Error>::none() : Error("Unsupported API version");
});
add(&Flags::endpoint,
"endpoint",
"Path to the Unix domain socket the plugin should bind to.");
add(&Flags::work_dir,
"work_dir",
"Path to the work directory of the plugin.");
add(&Flags::available_capacity,
"available_capacity",
"The available disk capacity managed by the plugin, in addition\n"
"to the pre-existing volumes specified in the --volumes flag.");
add(&Flags::create_parameters,
"create_parameters",
"The parameters required for volume creation. The parameters are\n"
"specified as a semicolon-delimited list of param=value pairs.\n"
"(Example: 'param1=value1;param2=value2')");
add(&Flags::volumes,
"volumes",
"Creates preprovisioned volumes upon start-up. The volumes are\n"
"specified as a semicolon-delimited list of name:capacity pairs.\n"
"If a volume with the same name already exists, the pair will be\n"
"ignored. (Example: 'volume1:1GB;volume2:2GB')");
add(&Flags::forward,
"forward",
"If set, the plugin forwards all requests to the specified Unix\n"
"domain socket. (Example: 'unix:///path/to/socket')");
}
Option<string> api_version;
string endpoint;
string work_dir;
Bytes available_capacity;
Option<string> create_parameters;
Option<string> volumes;
Option<string> forward;
};
class TestCSIPlugin
: public csi::v0::Identity::Service,
public csi::v0::Controller::Service,
public csi::v0::Node::Service,
public csi::v1::Identity::Service,
public csi::v1::Controller::Service,
public csi::v1::Node::Service
{
public:
TestCSIPlugin(
const Option<string>& _apiVersion,
const string& _endpoint,
const string& _workDir,
const Bytes& _availableCapacity,
const hashmap<string, string>& _createParameters,
const hashmap<string, Bytes>& _volumes)
: apiVersion(_apiVersion),
endpoint(_endpoint),
workDir(_workDir),
availableCapacity(_availableCapacity),
createParameters(_createParameters.begin(), _createParameters.end())
{
// Construct the default mount volume capability.
defaultVolumeCapability.mutable_mount();
defaultVolumeCapability.mutable_access_mode()
->set_mode(VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
// Scan for preprovisioned volumes.
//
// TODO(jieyu): Consider not using CHECKs here.
Try<list<string>> paths = os::ls(workDir);
CHECK_SOME(paths);
foreach (const string& path, paths.get()) {
Try<VolumeInfo> volumeInfo = parseVolumePath(path);
CHECK_SOME(volumeInfo);
CHECK(!volumes.contains(volumeInfo->id));
volumes.put(volumeInfo->id, volumeInfo.get());
if (!_volumes.contains(volumeInfo->id)) {
CHECK_GE(availableCapacity, volumeInfo->size);
availableCapacity -= volumeInfo->size;
}
}
foreachpair (const string& name, const Bytes& capacity, _volumes) {
if (volumes.contains(name)) {
continue;
}
VolumeInfo volumeInfo;
volumeInfo.id = name;
volumeInfo.size = capacity;
const string path = getVolumePath(volumeInfo);
Try<Nothing> mkdir = os::mkdir(path);
CHECK_SOME(mkdir);
volumes.put(volumeInfo.id, volumeInfo);
}
}
void run();
// CSI v0 RPCs.
Status GetPluginInfo(
ServerContext* context,
const csi::v0::GetPluginInfoRequest* request,
csi::v0::GetPluginInfoResponse* response) override;
Status GetPluginCapabilities(
ServerContext* context,
const csi::v0::GetPluginCapabilitiesRequest* request,
csi::v0::GetPluginCapabilitiesResponse* response) override;
Status Probe(
ServerContext* context,
const csi::v0::ProbeRequest* request,
csi::v0::ProbeResponse* response) override;
Status CreateVolume(
ServerContext* context,
const csi::v0::CreateVolumeRequest* request,
csi::v0::CreateVolumeResponse* response) override;
Status DeleteVolume(
ServerContext* context,
const csi::v0::DeleteVolumeRequest* request,
csi::v0::DeleteVolumeResponse* response) override;
Status ControllerPublishVolume(
ServerContext* context,
const csi::v0::ControllerPublishVolumeRequest* request,
csi::v0::ControllerPublishVolumeResponse* response) override;
Status ControllerUnpublishVolume(
ServerContext* context,
const csi::v0::ControllerUnpublishVolumeRequest* request,
csi::v0::ControllerUnpublishVolumeResponse* response) override;
Status ValidateVolumeCapabilities(
ServerContext* context,
const csi::v0::ValidateVolumeCapabilitiesRequest* request,
csi::v0::ValidateVolumeCapabilitiesResponse* response) override;
Status ListVolumes(
ServerContext* context,
const csi::v0::ListVolumesRequest* request,
csi::v0::ListVolumesResponse* response) override;
Status GetCapacity(
ServerContext* context,
const csi::v0::GetCapacityRequest* request,
csi::v0::GetCapacityResponse* response) override;
Status ControllerGetCapabilities(
ServerContext* context,
const csi::v0::ControllerGetCapabilitiesRequest* request,
csi::v0::ControllerGetCapabilitiesResponse* response) override;
Status NodeStageVolume(
ServerContext* context,
const csi::v0::NodeStageVolumeRequest* request,
csi::v0::NodeStageVolumeResponse* response) override;
Status NodeUnstageVolume(
ServerContext* context,
const csi::v0::NodeUnstageVolumeRequest* request,
csi::v0::NodeUnstageVolumeResponse* response) override;
Status NodePublishVolume(
ServerContext* context,
const csi::v0::NodePublishVolumeRequest* request,
csi::v0::NodePublishVolumeResponse* response) override;
Status NodeUnpublishVolume(
ServerContext* context,
const csi::v0::NodeUnpublishVolumeRequest* request,
csi::v0::NodeUnpublishVolumeResponse* response) override;
Status NodeGetId(
ServerContext* context,
const csi::v0::NodeGetIdRequest* request,
csi::v0::NodeGetIdResponse* response) override;
Status NodeGetCapabilities(
ServerContext* context,
const csi::v0::NodeGetCapabilitiesRequest* request,
csi::v0::NodeGetCapabilitiesResponse* response) override;
// CSI v1 RPCs.
Status GetPluginInfo(
ServerContext* context,
const csi::v1::GetPluginInfoRequest* request,
csi::v1::GetPluginInfoResponse* response) override;
Status GetPluginCapabilities(
ServerContext* context,
const csi::v1::GetPluginCapabilitiesRequest* request,
csi::v1::GetPluginCapabilitiesResponse* response) override;
Status Probe(
ServerContext* context,
const csi::v1::ProbeRequest* request,
csi::v1::ProbeResponse* response) override;
Status CreateVolume(
ServerContext* context,
const csi::v1::CreateVolumeRequest* request,
csi::v1::CreateVolumeResponse* response) override;
Status DeleteVolume(
ServerContext* context,
const csi::v1::DeleteVolumeRequest* request,
csi::v1::DeleteVolumeResponse* response) override;
Status ControllerPublishVolume(
ServerContext* context,
const csi::v1::ControllerPublishVolumeRequest* request,
csi::v1::ControllerPublishVolumeResponse* response) override;
Status ControllerUnpublishVolume(
ServerContext* context,
const csi::v1::ControllerUnpublishVolumeRequest* request,
csi::v1::ControllerUnpublishVolumeResponse* response) override;
Status ValidateVolumeCapabilities(
ServerContext* context,
const csi::v1::ValidateVolumeCapabilitiesRequest* request,
csi::v1::ValidateVolumeCapabilitiesResponse* response) override;
Status ListVolumes(
ServerContext* context,
const csi::v1::ListVolumesRequest* request,
csi::v1::ListVolumesResponse* response) override;
Status GetCapacity(
ServerContext* context,
const csi::v1::GetCapacityRequest* request,
csi::v1::GetCapacityResponse* response) override;
Status ControllerGetCapabilities(
ServerContext* context,
const csi::v1::ControllerGetCapabilitiesRequest* request,
csi::v1::ControllerGetCapabilitiesResponse* response) override;
Status NodeStageVolume(
ServerContext* context,
const csi::v1::NodeStageVolumeRequest* request,
csi::v1::NodeStageVolumeResponse* response) override;
Status NodeUnstageVolume(
ServerContext* context,
const csi::v1::NodeUnstageVolumeRequest* request,
csi::v1::NodeUnstageVolumeResponse* response) override;
Status NodePublishVolume(
ServerContext* context,
const csi::v1::NodePublishVolumeRequest* request,
csi::v1::NodePublishVolumeResponse* response) override;
Status NodeUnpublishVolume(
ServerContext* context,
const csi::v1::NodeUnpublishVolumeRequest* request,
csi::v1::NodeUnpublishVolumeResponse* response) override;
Status NodeGetCapabilities(
ServerContext* context,
const csi::v1::NodeGetCapabilitiesRequest* request,
csi::v1::NodeGetCapabilitiesResponse* response) override;
Status NodeGetInfo(
ServerContext* context,
const csi::v1::NodeGetInfoRequest* request,
csi::v1::NodeGetInfoResponse* response) override;
private:
struct VolumeInfo
{
string id;
Bytes size;
};
string getVolumePath(const VolumeInfo& volumeInfo);
Try<VolumeInfo> parseVolumePath(const string& path);
Try<VolumeInfo, StatusError> createVolume(
const string& name,
const Bytes& requiredBytes,
const Bytes& limitBytes,
const RepeatedPtrField<VolumeCapability>& capabilities,
const Map<string, string> parameters);
Try<Nothing, StatusError> deleteVolume(const string& volumeId);
Try<Nothing, StatusError> controllerPublishVolume(
const string& volumeId,
const string& nodeId,
const VolumeCapability& capability,
bool readonly,
const Map<string, string>& volumeContext);
Try<Nothing, StatusError> controllerUnpublishVolume(
const string& volumeId, const string& nodeId);
// Returns `StatusError` if the volume does not exist; returns `Option<Error>`
// with an error set if the volume is not compatible with the given arguments.
Try<Option<Error>, StatusError> validateVolumeCapabilities(
const string& volumeId,
const Map<string, string>& volumeContext,
const RepeatedPtrField<VolumeCapability>& capabilities,
const Option<Map<string, string>>& parameters = None());
Try<vector<VolumeInfo>, StatusError> listVolumes(
const Option<int32_t>& maxEntries,
const Option<string>& startingToken);
Try<Bytes, StatusError> getCapacity(
const RepeatedPtrField<VolumeCapability>& capabilities,
const Map<string, string>& parameters);
Try<Nothing, StatusError> nodeStageVolume(
const string& volumeId,
const Map<string, string>& publishContext,
const string& stagingPath,
const VolumeCapability& capability,
const Map<string, string>& volumeContext);
Try<Nothing, StatusError> nodeUnstageVolume(
const string& volumeId, const string& stagingPath);
Try<Nothing, StatusError> nodePublishVolume(
const string& volumeId,
const Map<string, string>& publishContext,
const string& stagingPath,
const string& targetPath,
const VolumeCapability& capability,
bool readonly,
const Map<string, string>& volumeContext);
Try<Nothing, StatusError> nodeUnpublishVolume(
const string& volumeId, const string& targetPath);
const Option<string> apiVersion;
const string endpoint;
const string workDir;
Bytes availableCapacity;
VolumeCapability defaultVolumeCapability;
Map<string, string> createParameters;
hashmap<string, VolumeInfo> volumes;
};
void TestCSIPlugin::run()
{
ServerBuilder builder;
builder.AddListeningPort(endpoint, grpc::InsecureServerCredentials());
if (apiVersion.isNone() || apiVersion.get() == mesos::csi::v0::API_VERSION) {
builder.RegisterService(static_cast<csi::v0::Identity::Service*>(this));
builder.RegisterService(static_cast<csi::v0::Controller::Service*>(this));
builder.RegisterService(static_cast<csi::v0::Node::Service*>(this));
}
if (apiVersion.isNone() || apiVersion.get() == mesos::csi::v1::API_VERSION) {
builder.RegisterService(static_cast<csi::v1::Identity::Service*>(this));
builder.RegisterService(static_cast<csi::v1::Controller::Service*>(this));
builder.RegisterService(static_cast<csi::v1::Node::Service*>(this));
}
std::unique_ptr<Server> server = builder.BuildAndStart();
server->Wait();
}
Status TestCSIPlugin::GetPluginInfo(
ServerContext* context,
const csi::v0::GetPluginInfoRequest* request,
csi::v0::GetPluginInfoResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
response->set_name(PLUGIN_NAME);
response->set_vendor_version(MESOS_VERSION);
return Status::OK;
}
Status TestCSIPlugin::GetPluginCapabilities(
ServerContext* context,
const csi::v0::GetPluginCapabilitiesRequest* request,
csi::v0::GetPluginCapabilitiesResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
response->add_capabilities()->mutable_service()->set_type(
csi::v0::PluginCapability::Service::CONTROLLER_SERVICE);
return Status::OK;
}
Status TestCSIPlugin::Probe(
ServerContext* context,
const csi::v0::ProbeRequest* request,
csi::v0::ProbeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
return Status::OK;
}
Status TestCSIPlugin::CreateVolume(
ServerContext* context,
const csi::v0::CreateVolumeRequest* request,
csi::v0::CreateVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate the request.
Try<VolumeInfo, StatusError> result = createVolume(
request->name(),
request->capacity_range().required_bytes()
? request->capacity_range().required_bytes() : 1,
request->capacity_range().limit_bytes()
? request->capacity_range().limit_bytes()
: std::numeric_limits<int64_t>::max(),
mesos::csi::v0::devolve(request->volume_capabilities()),
request->parameters());
if (result.isError()) {
return result.error().status;
}
response->mutable_volume()->set_id(result->id);
response->mutable_volume()->set_capacity_bytes(result->size.bytes());
(*response->mutable_volume()->mutable_attributes())["path"] =
getVolumePath(result.get());
return Status::OK;
}
Status TestCSIPlugin::DeleteVolume(
ServerContext* context,
const csi::v0::DeleteVolumeRequest* request,
csi::v0::DeleteVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate the request.
Try<Nothing, StatusError> result = deleteVolume(request->volume_id());
if (result.isError()) {
return result.error().status;
}
return Status::OK;
}
Status TestCSIPlugin::ControllerPublishVolume(
ServerContext* context,
const csi::v0::ControllerPublishVolumeRequest* request,
csi::v0::ControllerPublishVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate the request.
Try<Nothing, StatusError> result = controllerPublishVolume(
request->volume_id(),
request->node_id(),
mesos::csi::v0::devolve(request->volume_capability()),
request->readonly(),
request->volume_attributes());
if (result.isError()) {
return result.error().status;
}
return Status::OK;
}
Status TestCSIPlugin::ControllerUnpublishVolume(
ServerContext* context,
const csi::v0::ControllerUnpublishVolumeRequest* request,
csi::v0::ControllerUnpublishVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate the request.
Try<Nothing, StatusError> result =
controllerUnpublishVolume(request->volume_id(), request->node_id());
if (result.isError()) {
return result.error().status;
}
return Status::OK;
}
Status TestCSIPlugin::ValidateVolumeCapabilities(
ServerContext* context,
const csi::v0::ValidateVolumeCapabilitiesRequest* request,
csi::v0::ValidateVolumeCapabilitiesResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate the request.
Try<Option<Error>, StatusError> result = validateVolumeCapabilities(
request->volume_id(),
request->volume_attributes(),
mesos::csi::v0::devolve(request->volume_capabilities()));
if (result.isError()) {
return result.error().status;
}
if (result->isSome()) {
response->set_supported(false);
response->set_message(result->get().message);
} else {
response->set_supported(true);
}
return Status::OK;
}
Status TestCSIPlugin::ListVolumes(
ServerContext* context,
const csi::v0::ListVolumesRequest* request,
csi::v0::ListVolumesResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
Try<vector<VolumeInfo>, StatusError> result = listVolumes(
request->max_entries()
? request->max_entries() : Option<int32_t>::none(),
!request->starting_token().empty()
? request->starting_token() : Option<string>::none());
if (result.isError()) {
return result.error().status;
}
foreach (const VolumeInfo& volumeInfo, result.get()) {
csi::v0::Volume* volume = response->add_entries()->mutable_volume();
volume->set_id(volumeInfo.id);
volume->set_capacity_bytes(volumeInfo.size.bytes());
(*volume->mutable_attributes())["path"] = getVolumePath(volumeInfo);
}
return Status::OK;
}
Status TestCSIPlugin::GetCapacity(
ServerContext* context,
const csi::v0::GetCapacityRequest* request,
csi::v0::GetCapacityResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
Try<Bytes, StatusError> result = getCapacity(
mesos::csi::v0::devolve(request->volume_capabilities()),
request->parameters());
if (result.isError()) {
return result.error().status;
}
response->set_available_capacity(result->bytes());
return Status::OK;
}
Status TestCSIPlugin::ControllerGetCapabilities(
ServerContext* context,
const csi::v0::ControllerGetCapabilitiesRequest* request,
csi::v0::ControllerGetCapabilitiesResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
const vector<csi::v0::ControllerServiceCapability::RPC::Type> rpcs = {
csi::v0::ControllerServiceCapability::RPC::CREATE_DELETE_VOLUME,
csi::v0::ControllerServiceCapability::RPC::PUBLISH_UNPUBLISH_VOLUME,
csi::v0::ControllerServiceCapability::RPC::GET_CAPACITY,
csi::v0::ControllerServiceCapability::RPC::LIST_VOLUMES,
};
foreach (const csi::v0::ControllerServiceCapability::RPC::Type rpc, rpcs) {
response->add_capabilities()->mutable_rpc()->set_type(rpc);
}
return Status::OK;
}
Status TestCSIPlugin::NodeStageVolume(
ServerContext* context,
const csi::v0::NodeStageVolumeRequest* request,
csi::v0::NodeStageVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate the request.
Try<Nothing, StatusError> result = nodeStageVolume(
request->volume_id(),
request->publish_info(),
request->staging_target_path(),
mesos::csi::v0::devolve(request->volume_capability()),
request->volume_attributes());
if (result.isError()) {
return result.error().status;
}
return Status::OK;
}
Status TestCSIPlugin::NodeUnstageVolume(
ServerContext* context,
const csi::v0::NodeUnstageVolumeRequest* request,
csi::v0::NodeUnstageVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate the request.
Try<Nothing, StatusError> result =
nodeUnstageVolume(request->volume_id(), request->staging_target_path());
if (result.isError()) {
return result.error().status;
}
return Status::OK;
}
Status TestCSIPlugin::NodePublishVolume(
ServerContext* context,
const csi::v0::NodePublishVolumeRequest* request,
csi::v0::NodePublishVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate the request.
Try<Nothing, StatusError> result = nodePublishVolume(
request->volume_id(),
request->publish_info(),
request->staging_target_path(),
request->target_path(),
mesos::csi::v0::devolve(request->volume_capability()),
request->readonly(),
request->volume_attributes());
if (result.isError()) {
return result.error().status;
}
return Status::OK;
}
Status TestCSIPlugin::NodeUnpublishVolume(
ServerContext* context,
const csi::v0::NodeUnpublishVolumeRequest* request,
csi::v0::NodeUnpublishVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate the request.
Try<Nothing, StatusError> result =
nodeUnpublishVolume(request->volume_id(), request->target_path());
if (result.isError()) {
return result.error().status;
}
return Status::OK;
}
Status TestCSIPlugin::NodeGetId(
ServerContext* context,
const csi::v0::NodeGetIdRequest* request,
csi::v0::NodeGetIdResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
response->set_node_id(NODE_ID);
return Status::OK;
}
Status TestCSIPlugin::NodeGetCapabilities(
ServerContext* context,
const csi::v0::NodeGetCapabilitiesRequest* request,
csi::v0::NodeGetCapabilitiesResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
response->add_capabilities()->mutable_rpc()->set_type(
csi::v0::NodeServiceCapability::RPC::STAGE_UNSTAGE_VOLUME);
return Status::OK;
}
Status TestCSIPlugin::GetPluginInfo(
ServerContext* context,
const csi::v1::GetPluginInfoRequest* request,
csi::v1::GetPluginInfoResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// Since CSI v1 requires the plugin name to follow the (forward) domain name
// notation, we reverse the plugin's reverse-DNS name here.
response->set_name(
strings::join(".", adaptor::reverse(strings::split(".", PLUGIN_NAME))));
response->set_vendor_version(MESOS_VERSION);
return Status::OK;
}
Status TestCSIPlugin::GetPluginCapabilities(
ServerContext* context,
const csi::v1::GetPluginCapabilitiesRequest* request,
csi::v1::GetPluginCapabilitiesResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
response->add_capabilities()->mutable_service()->set_type(
csi::v1::PluginCapability::Service::CONTROLLER_SERVICE);
return Status::OK;
}
Status TestCSIPlugin::Probe(
ServerContext* context,
const csi::v1::ProbeRequest* request,
csi::v1::ProbeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
response->mutable_ready()->set_value(true);
return Status::OK;
}
Status TestCSIPlugin::CreateVolume(
ServerContext* context,
const csi::v1::CreateVolumeRequest* request,
csi::v1::CreateVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate the request.
Try<VolumeInfo, StatusError> result = createVolume(
request->name(),
request->capacity_range().required_bytes()
? request->capacity_range().required_bytes() : 1,
request->capacity_range().limit_bytes()
? request->capacity_range().limit_bytes()
: std::numeric_limits<int64_t>::max(),
mesos::csi::v1::devolve(request->volume_capabilities()),
request->parameters());
if (result.isError()) {
return result.error().status;
}
response->mutable_volume()->set_volume_id(result->id);
response->mutable_volume()->set_capacity_bytes(result->size.bytes());
(*response->mutable_volume()->mutable_volume_context())["path"] =
getVolumePath(result.get());
return Status::OK;
}
Status TestCSIPlugin::DeleteVolume(
ServerContext* context,
const csi::v1::DeleteVolumeRequest* request,
csi::v1::DeleteVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate the request.
Try<Nothing, StatusError> result = deleteVolume(request->volume_id());
if (result.isError()) {
return result.error().status;
}
return Status::OK;
}
Status TestCSIPlugin::ControllerPublishVolume(
ServerContext* context,
const csi::v1::ControllerPublishVolumeRequest* request,
csi::v1::ControllerPublishVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate the request.
Try<Nothing, StatusError> result = controllerPublishVolume(
request->volume_id(),
request->node_id(),
mesos::csi::v1::devolve(request->volume_capability()),
request->readonly(),
request->volume_context());
if (result.isError()) {
return result.error().status;
}
return Status::OK;
}
Status TestCSIPlugin::ControllerUnpublishVolume(
ServerContext* context,
const csi::v1::ControllerUnpublishVolumeRequest* request,
csi::v1::ControllerUnpublishVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate the request.
Try<Nothing, StatusError> result =
controllerUnpublishVolume(request->volume_id(), request->node_id());
if (result.isError()) {
return result.error().status;
}
return Status::OK;
}
Status TestCSIPlugin::ValidateVolumeCapabilities(
ServerContext* context,
const csi::v1::ValidateVolumeCapabilitiesRequest* request,
csi::v1::ValidateVolumeCapabilitiesResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate the request.
Try<Option<Error>, StatusError> result = validateVolumeCapabilities(
request->volume_id(),
request->volume_context(),
mesos::csi::v1::devolve(request->volume_capabilities()),
request->parameters());
if (result.isError()) {
return result.error().status;
}
if (result->isSome()) {
response->set_message(result->get().message);
} else {
*response->mutable_confirmed()->mutable_volume_context() =
request->volume_context();
*response->mutable_confirmed()->mutable_volume_capabilities() =
request->volume_capabilities();
*response->mutable_confirmed()->mutable_parameters() =
request->parameters();
}
return Status::OK;
}
Status TestCSIPlugin::ListVolumes(
ServerContext* context,
const csi::v1::ListVolumesRequest* request,
csi::v1::ListVolumesResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
Try<vector<VolumeInfo>, StatusError> result = listVolumes(
request->max_entries()
? request->max_entries() : Option<int32_t>::none(),
!request->starting_token().empty()
? request->starting_token() : Option<string>::none());
if (result.isError()) {
return result.error().status;
}
foreach (const VolumeInfo& volumeInfo, result.get()) {
csi::v1::Volume* volume = response->add_entries()->mutable_volume();
volume->set_volume_id(volumeInfo.id);
volume->set_capacity_bytes(volumeInfo.size.bytes());
(*volume->mutable_volume_context())["path"] = getVolumePath(volumeInfo);
}
return Status::OK;
}
Status TestCSIPlugin::GetCapacity(
ServerContext* context,
const csi::v1::GetCapacityRequest* request,
csi::v1::GetCapacityResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
Try<Bytes, StatusError> result = getCapacity(
mesos::csi::v1::devolve(request->volume_capabilities()),
request->parameters());
if (result.isError()) {
return result.error().status;
}
response->set_available_capacity(result->bytes());
return Status::OK;
}
string TestCSIPlugin::getVolumePath(const VolumeInfo& volumeInfo)
{
return path::join(
workDir,
strings::join("-", stringify(volumeInfo.size), volumeInfo.id));
}
Status TestCSIPlugin::ControllerGetCapabilities(
ServerContext* context,
const csi::v1::ControllerGetCapabilitiesRequest* request,
csi::v1::ControllerGetCapabilitiesResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
const vector<csi::v1::ControllerServiceCapability::RPC::Type> rpcs = {
csi::v1::ControllerServiceCapability::RPC::CREATE_DELETE_VOLUME,
csi::v1::ControllerServiceCapability::RPC::PUBLISH_UNPUBLISH_VOLUME,
csi::v1::ControllerServiceCapability::RPC::GET_CAPACITY,
csi::v1::ControllerServiceCapability::RPC::LIST_VOLUMES,
};
foreach (const csi::v1::ControllerServiceCapability::RPC::Type rpc, rpcs) {
response->add_capabilities()->mutable_rpc()->set_type(rpc);
}
return Status::OK;
}
Status TestCSIPlugin::NodeStageVolume(
ServerContext* context,
const csi::v1::NodeStageVolumeRequest* request,
csi::v1::NodeStageVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate the request.
Try<Nothing, StatusError> result = nodeStageVolume(
request->volume_id(),
request->publish_context(),
request->staging_target_path(),
mesos::csi::v1::devolve(request->volume_capability()),
request->volume_context());
if (result.isError()) {
return result.error().status;
}
return Status::OK;
}
Status TestCSIPlugin::NodeUnstageVolume(
ServerContext* context,
const csi::v1::NodeUnstageVolumeRequest* request,
csi::v1::NodeUnstageVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate the request.
Try<Nothing, StatusError> result =
nodeUnstageVolume(request->volume_id(), request->staging_target_path());
if (result.isError()) {
return result.error().status;
}
return Status::OK;
}
Status TestCSIPlugin::NodePublishVolume(
ServerContext* context,
const csi::v1::NodePublishVolumeRequest* request,
csi::v1::NodePublishVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate the request.
// Since CSI v1 states that creation of the target path is the responsibility
// of the plugin, we create the mount point here if it does not exist.
if (!os::exists(request->target_path())) {
Try<Nothing> mkdir = os::mkdir(request->target_path());
if (mkdir.isError()) {
return Status(
grpc::INTERNAL,
"Failed to create target path '" + request->target_path() +
"' for volume '" + request->volume_id() + "': " + mkdir.error());
}
}
Try<Nothing, StatusError> result = nodePublishVolume(
request->volume_id(),
request->publish_context(),
request->staging_target_path(),
request->target_path(),
mesos::csi::v1::devolve(request->volume_capability()),
request->readonly(),
request->volume_context());
if (result.isError()) {
return result.error().status;
}
return Status::OK;
}
Status TestCSIPlugin::NodeUnpublishVolume(
ServerContext* context,
const csi::v1::NodeUnpublishVolumeRequest* request,
csi::v1::NodeUnpublishVolumeResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate the request.
Try<Nothing, StatusError> result =
nodeUnpublishVolume(request->volume_id(), request->target_path());
if (result.isError()) {
return result.error().status;
}
// Since CSI v1 requires the plugin to delete the target path, we delete it
// here if it has not been deleted by a previous attempt.
if (os::exists(request->target_path())) {
Try<Nothing> rmdir = os::rmdir(request->target_path());
if (rmdir.isError()) {
return Status(
grpc::INTERNAL,
"Failed to remove target path '" + request->target_path() +
"' for volume '" + request->volume_id() + "': " + rmdir.error());
}
}
return Status::OK;
}
Status TestCSIPlugin::NodeGetCapabilities(
ServerContext* context,
const csi::v1::NodeGetCapabilitiesRequest* request,
csi::v1::NodeGetCapabilitiesResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
response->add_capabilities()->mutable_rpc()->set_type(
csi::v1::NodeServiceCapability::RPC::STAGE_UNSTAGE_VOLUME);
return Status::OK;
}
Status TestCSIPlugin::NodeGetInfo(
ServerContext* context,
const csi::v1::NodeGetInfoRequest* request,
csi::v1::NodeGetInfoResponse* response)
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
response->set_node_id(NODE_ID);
return Status::OK;
}
Try<TestCSIPlugin::VolumeInfo> TestCSIPlugin::parseVolumePath(
const string& path)
{
size_t pos = path.find_first_of("-");
if (pos == string::npos) {
return Error("Cannot find the delimiter");
}
string bytesString = path.substr(0, path.find_first_of("-"));
string id = path.substr(path.find_first_of("-") + 1);
Try<Bytes> bytes = Bytes::parse(bytesString);
if (bytes.isError()) {
return Error("Failed to parse bytes: " + bytes.error());
}
VolumeInfo volumeInfo;
volumeInfo.id = id;
volumeInfo.size = bytes.get();
return volumeInfo;
}
Try<TestCSIPlugin::VolumeInfo, StatusError> TestCSIPlugin::createVolume(
const string& name,
const Bytes& requiredBytes,
const Bytes& limitBytes,
const RepeatedPtrField<VolumeCapability>& capabilities,
const Map<string, string> parameters)
{
// The volume ID is determined by `name`, with reserved characters escaped.
const string volumeId = http::encode(name);
foreach (const VolumeCapability& capability, capabilities) {
if (capability != defaultVolumeCapability) {
return StatusError(Status(
grpc::INVALID_ARGUMENT, "Unsupported volume capabilities"));
}
}
if (parameters != createParameters) {
return StatusError(Status(
grpc::INVALID_ARGUMENT, "Unsupported create parameters"));
}
if (volumes.contains(volumeId)) {
const VolumeInfo& volumeInfo = volumes.at(volumeId);
if (volumeInfo.size > limitBytes) {
return StatusError(Status(
grpc::ALREADY_EXISTS, "Cannot satisfy limit bytes"));
}
if (volumeInfo.size < requiredBytes) {
return StatusError(Status(
grpc::ALREADY_EXISTS, "Cannot satisfy required bytes"));
}
return volumeInfo;
} else {
if (availableCapacity < requiredBytes) {
return StatusError(Status(grpc::OUT_OF_RANGE, "Insufficient capacity"));
}
VolumeInfo volumeInfo;
volumeInfo.id = volumeId;
// We assume that `requiredBytes <= limitBytes` has been verified.
const Bytes defaultSize = min(availableCapacity, DEFAULT_VOLUME_CAPACITY);
volumeInfo.size = min(max(defaultSize, requiredBytes), limitBytes);
const string path = getVolumePath(volumeInfo);
Try<Nothing> mkdir = os::mkdir(path);
if (mkdir.isError()) {
return StatusError(Status(
grpc::INTERNAL,
"Failed to create volume '" + volumeInfo.id + "': " + mkdir.error()));
}
CHECK_GE(availableCapacity, volumeInfo.size);
availableCapacity -= volumeInfo.size;
volumes.put(volumeInfo.id, volumeInfo);
return volumeInfo;
}
UNREACHABLE();
}
Try<Nothing, StatusError> TestCSIPlugin::deleteVolume(const string& volumeId)
{
if (!volumes.contains(volumeId)) {
// Return a success for idempotency.
return Nothing();
}
const VolumeInfo& volumeInfo = volumes.at(volumeId);
const string path = getVolumePath(volumeInfo);
Try<Nothing> rmdir = os::rmdir(path);
if (rmdir.isError()) {
return StatusError(Status(
grpc::INTERNAL,
"Failed to delete volume '" + volumeId + "': " + rmdir.error()));
}
availableCapacity += volumeInfo.size;
volumes.erase(volumeInfo.id);
return Nothing();
}
Try<Nothing, StatusError> TestCSIPlugin::controllerPublishVolume(
const string& volumeId,
const string& nodeId,
const VolumeCapability& capability,
bool readonly,
const Map<string, string>& volumeContext)
{
if (!volumes.contains(volumeId)) {
return StatusError(Status(
grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist"));
}
if (nodeId != NODE_ID) {
return StatusError(Status(
grpc::NOT_FOUND, "Node '" + nodeId + "' does not exist"));
}
if (capability != defaultVolumeCapability) {
return StatusError(Status(
grpc::INVALID_ARGUMENT, "Unsupported volume capability"));
}
if (readonly) {
return StatusError(Status(
grpc::INVALID_ARGUMENT, "Unsupported read-only mode"));
}
const VolumeInfo& volumeInfo = volumes.at(volumeId);
const string path = getVolumePath(volumeInfo);
if (!volumeContext.count("path") || volumeContext.at("path") != path) {
return StatusError(Status(
grpc::INVALID_ARGUMENT, "Invalid volume context"));
}
// Do nothing.
return Nothing();
}
Try<Nothing, StatusError> TestCSIPlugin::controllerUnpublishVolume(
const string& volumeId, const string& nodeId)
{
if (!volumes.contains(volumeId)) {
return StatusError(Status(
grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist"));
}
if (nodeId != NODE_ID) {
return StatusError(Status(
grpc::NOT_FOUND, "Node '" + nodeId + "' does not exist"));
}
// Do nothing.
return Nothing();
}
Try<Option<Error>, StatusError> TestCSIPlugin::validateVolumeCapabilities(
const string& volumeId,
const Map<string, string>& volumeContext,
const RepeatedPtrField<VolumeCapability>& capabilities,
const Option<Map<string, string>>& parameters)
{
if (!volumes.contains(volumeId)) {
return StatusError(Status(
grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist"));
}
const VolumeInfo& volumeInfo = volumes.at(volumeId);
const string path = getVolumePath(volumeInfo);
if (!volumeContext.count("path") || volumeContext.at("path") != path) {
return StatusError(Status(
grpc::INVALID_ARGUMENT, "Invalid volume context"));
}
foreach (const VolumeCapability& capability, capabilities) {
if (capability != defaultVolumeCapability) {
return Some(Error("Unsupported volume capabilities"));
}
}
if (parameters.isSome() && parameters.get() != createParameters) {
return Some(Error("Mismatched parameters"));
}
return None();
}
Try<vector<TestCSIPlugin::VolumeInfo>, StatusError> TestCSIPlugin::listVolumes(
const Option<int32_t>& maxEntries,
const Option<string>& startingToken)
{
// TODO(chhsiao): Support max entries.
if (maxEntries.isSome()) {
return StatusError(Status(
grpc::ABORTED, "Specifying max entries is not supported"));
}
// TODO(chhsiao): Support starting token.
if (startingToken.isSome()) {
return StatusError(Status(
grpc::ABORTED, "Specifying starting token is not supported"));
}
return volumes.values();
}
Try<Bytes, StatusError> TestCSIPlugin::getCapacity(
const RepeatedPtrField<VolumeCapability>& capabilities,
const Map<string, string>& parameters)
{
// We report zero capacity if any capability other than the default mount
// volume capability is given. If no capacity is given, the total available
// capacity will be returned.
foreach (const VolumeCapability& capability, capabilities) {
if (capability != defaultVolumeCapability) {
return Bytes(0);
}
}
if (parameters != createParameters) {
return Bytes(0);
}
return availableCapacity;
}
Try<Nothing, StatusError> TestCSIPlugin::nodeStageVolume(
const string& volumeId,
const Map<string, string>& publishContext,
const string& stagingPath,
const VolumeCapability& capability,
const Map<string, string>& volumeContext)
{
if (!volumes.contains(volumeId)) {
return StatusError(Status(
grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist"));
}
if (!publishContext.empty()) {
return StatusError(Status(
grpc::INVALID_ARGUMENT, "Invalid publish context"));
}
if (!os::exists(stagingPath)) {
return StatusError(Status(
grpc::INVALID_ARGUMENT,
"Staging path '" + stagingPath + "' does not exist"));
}
if (capability != defaultVolumeCapability) {
return StatusError(Status(
grpc::INVALID_ARGUMENT, "Unsupported volume capability"));
}
const VolumeInfo& volumeInfo = volumes.at(volumeId);
const string path = getVolumePath(volumeInfo);
if (!volumeContext.count("path") || volumeContext.at("path") != path) {
return StatusError(Status(
grpc::INVALID_ARGUMENT, "Invalid volume context"));
}
Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
if (table.isError()) {
return StatusError(Status(
grpc::INTERNAL, "Failed to get mount table: " + table.error()));
}
if (std::any_of(
table->entries.begin(),
table->entries.end(),
[&](const fs::MountInfoTable::Entry& entry) {
return entry.target == stagingPath;
})) {
return Nothing();
}
Try<Nothing> mount = fs::mount(path, stagingPath, None(), MS_BIND, None());
if (mount.isError()) {
return StatusError(Status(
grpc::INTERNAL,
"Failed to mount from '" + path + "' to '" + stagingPath +
"': " + mount.error()));
}
return Nothing();
}
Try<Nothing, StatusError> TestCSIPlugin::nodeUnstageVolume(
const string& volumeId, const string& stagingPath)
{
if (!volumes.contains(volumeId)) {
return StatusError(Status(
grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist"));
}
Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
if (table.isError()) {
return StatusError(Status(
grpc::INTERNAL, "Failed to get mount table: " + table.error()));
}
if (std::none_of(
table->entries.begin(),
table->entries.end(),
[&](const fs::MountInfoTable::Entry& entry) {
return entry.target == stagingPath;
})) {
return Nothing();
}
Try<Nothing> unmount = fs::unmount(stagingPath);
if (unmount.isError()) {
return StatusError(Status(
grpc::INTERNAL,
"Failed to unmount '" + stagingPath + "': " + unmount.error()));
}
return Nothing();
}
Try<Nothing, StatusError> TestCSIPlugin::nodePublishVolume(
const string& volumeId,
const Map<string, string>& publishContext,
const string& stagingPath,
const string& targetPath,
const VolumeCapability& capability,
bool readonly,
const Map<string, string>& volumeContext)
{
if (!volumes.contains(volumeId)) {
return StatusError(Status(
grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist"));
}
if (!publishContext.empty()) {
return StatusError(Status(
grpc::INVALID_ARGUMENT, "Invalid publish context"));
}
if (!os::exists(targetPath)) {
return StatusError(Status(
grpc::INVALID_ARGUMENT,
"Target path '" + targetPath + "' does not exist"));
}
if (capability != defaultVolumeCapability) {
return StatusError(Status(
grpc::INVALID_ARGUMENT, "Unsupported volume capability"));
}
const VolumeInfo& volumeInfo = volumes.at(volumeId);
const string path = getVolumePath(volumeInfo);
if (!volumeContext.count("path") || volumeContext.at("path") != path) {
return StatusError(Status(
grpc::INVALID_ARGUMENT, "Invalid volume context"));
}
Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
if (table.isError()) {
return StatusError(Status(
grpc::INTERNAL, "Failed to get mount table: " + table.error()));
}
if (std::none_of(
table->entries.begin(),
table->entries.end(),
[&](const fs::MountInfoTable::Entry& entry) {
return entry.target == stagingPath;
})) {
return StatusError(Status(
grpc::FAILED_PRECONDITION,
"Volume '" + volumeId + "' has not been staged yet"));
}
if (std::any_of(
table->entries.begin(),
table->entries.end(),
[&](const fs::MountInfoTable::Entry& entry) {
return entry.target == targetPath;
})) {
return Nothing();
}
Try<Nothing> mount = fs::mount(
stagingPath,
targetPath,
None(),
MS_BIND | (readonly ? MS_RDONLY : 0),
None());
if (mount.isError()) {
return StatusError(Status(
grpc::INTERNAL,
"Failed to mount from '" + stagingPath + "' to '" + targetPath +
"': " + mount.error()));
}
return Nothing();
}
Try<Nothing, StatusError> TestCSIPlugin::nodeUnpublishVolume(
const string& volumeId, const string& targetPath)
{
if (!volumes.contains(volumeId)) {
return StatusError(Status(
grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist"));
}
Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
if (table.isError()) {
return StatusError(Status(
grpc::INTERNAL, "Failed to get mount table: " + table.error()));
}
if (std::none_of(
table->entries.begin(),
table->entries.end(),
[&](const fs::MountInfoTable::Entry& entry) {
return entry.target == targetPath;
})) {
return Nothing();
}
Try<Nothing> unmount = fs::unmount(targetPath);
if (unmount.isError()) {
return StatusError(Status(
grpc::INTERNAL,
"Failed to unmount '" + targetPath + "': " + unmount.error()));
}
return Nothing();
}
// Serves CSI calls from the given endpoint through forwarding the calls to
// another CSI endpoint and returning back the results.
class CSIProxy
{
public:
CSIProxy(const string& _endpoint, const string& forward)
: endpoint(_endpoint),
stub(grpc::CreateChannel(forward, grpc::InsecureChannelCredentials())),
service(new AsyncGenericService()) {}
void run();
private:
struct Call
{
Call() : state(State::INITIALIZED), serverReaderWriter(&serverContext) {}
enum class State {
INITIALIZED,
REQUESTED,
FORWARDING,
FINISHING,
} state;
GenericServerContext serverContext;
GenericServerAsyncReaderWriter serverReaderWriter;
ClientContext clientContext;
std::unique_ptr<GenericClientAsyncResponseReader> clientReader;
ByteBuffer request;
ByteBuffer response;
Status status;
};
void serve(ServerCompletionQueue* completionQueue);
const string endpoint;
GenericStub stub;
std::unique_ptr<AsyncGenericService> service;
};
void CSIProxy::run()
{
ServerBuilder builder;
builder.AddListeningPort(endpoint, grpc::InsecureServerCredentials());
builder.RegisterAsyncGenericService(service.get());
std::unique_ptr<ServerCompletionQueue> completionQueue =
builder.AddCompletionQueue();
std::unique_ptr<Server> server = builder.BuildAndStart();
std::thread looper(std::bind(&CSIProxy::serve, this, completionQueue.get()));
server->Wait();
// Shutdown the completion queue after the server has been shut down. The
// looper thread will then drain the queue before finishing. See:
// https://grpc.io/grpc/cpp/classgrpc_1_1_server_builder.html#a960d55977e1aef56fd7b582037a01bbd // NOLINT
completionQueue->Shutdown();
looper.join();
}
// The lifecycle of a forwarded CSI call is shown as follows. The transitions
// happen after the completions of the API calls.
//
// Server-side
// +-------------+ +-------------+ WriteAndFinish +---+
// | INITIALIZED | | FINISHING +----------------> X |
// +------+------+ +------^------+ +---+
// Server-side | | Client-side
// RequestCall | Server-side | Finish (unary call)
// +------v------+ Read +------+------+
// | REQUESTED +-------------> FORWARDING |
// +-------------+ +-------------+
//
void CSIProxy::serve(ServerCompletionQueue* completionQueue)
{
// Serve the first call. The ownership of the `Call` object is passed to the
// completion queue, and will be retrieved later in the loop below.
Call* first = new Call();
service->RequestCall(
&first->serverContext,
&first->serverReaderWriter,
completionQueue,
completionQueue,
first);
void* tag;
bool ok = false;
// See the link below for details of `ok`:
// https://grpc.io/grpc/cpp/classgrpc_1_1_completion_queue.html#a86d9810ced694e50f7987ac90b9f8c1a // NOLINT
while (completionQueue->Next(&tag, &ok)) {
// Retrieve the ownership of the next ready `Call` object from the
// completion queue.
Call* call = reinterpret_cast<Call*>(tag);
switch (call->state) {
case Call::State::INITIALIZED: {
if (!ok) {
// Server-side `RequestCall`: the server has been shutdown so continue
// to drain the queue.
continue;
}
call->state = Call::State::REQUESTED;
// Make a server-side `Read` call and return the ownership back to the
// completion queue.
call->serverReaderWriter.Read(&call->request, call);
// Serve the next call while processing this one. The ownership of the
// new `Call` object is passed to the completion queue, and will be
// retrieved in a later iteration.
Call* next = new Call();
service->RequestCall(
&next->serverContext,
&next->serverReaderWriter,
completionQueue,
completionQueue,
next);
break;
}
case Call::State::REQUESTED: {
if (!ok) {
// Server-side `Read`: the client has done a `WritesDone` already, so
// clean up the call and move on to the next one.
delete call;
continue;
}
LOG(INFO) << "Forwarding " << call->serverContext.method() << " call";
call->state = Call::State::FORWARDING;
call->clientContext.set_wait_for_ready(true);
call->clientContext.set_deadline(call->serverContext.deadline());
foreachpair (const grpc::string_ref& key,
const grpc::string_ref& value,
call->serverContext.client_metadata()) {
call->clientContext.AddMetadata(
grpc::string(key.data(), key.size()),
grpc::string(value.data(), value.size()));
}
call->clientReader = stub.PrepareUnaryCall(
&call->clientContext,
call->serverContext.method(),
call->request,
completionQueue);
call->clientReader->StartCall();
// Make a client-side `Finish` call for the unary RPC and return the
// ownership back to the completion queue.
call->clientReader->Finish(&call->response, &call->status, call);
break;
}
case Call::State::FORWARDING: {
// Client-side `Finish` for unary RPCs: `ok` should always be true.
CHECK(ok);
call->state = Call::State::FINISHING;
// Make a server-side `WriteAndFinish` call and return the ownership
// back to the completion queue.
call->serverReaderWriter.WriteAndFinish(
call->response, WriteOptions(), call->status, call);
break;
}
case Call::State::FINISHING: {
if (!ok) {
// Server-side `WriteAndFinish`: the response hasn't gone to the wire.
LOG(ERROR) << "Failed to forward response for "
<< call->serverContext.method() << " call";
}
// The call is completed so clean it up.
delete call;
break;
}
}
}
}
int main(int argc, char** argv)
{
Flags flags;
Try<flags::Warnings> load = flags.load("CSI_", argc, argv);
if (flags.help) {
cout << flags.usage() << endl;
return EXIT_SUCCESS;
}
if (load.isError()) {
cerr << flags.usage(load.error()) << endl;
return EXIT_FAILURE;
}
mesos::internal::logging::initialize(argv[0], true, flags);
// Log any flag warnings.
foreach (const flags::Warning& warning, load->warnings) {
LOG(WARNING) << warning.message;
}
Try<Nothing> mkdir = os::mkdir(flags.work_dir);
if (mkdir.isError()) {
cerr << "Failed to create working directory '" << flags.work_dir
<< "': " << mkdir.error() << endl;
return EXIT_FAILURE;
}
hashmap<string, string> createParameters;
if (flags.create_parameters.isSome()) {
foreachpair (const string& param,
const vector<string>& values,
strings::pairs(flags.create_parameters.get(), ";", "=")) {
Option<Error> error;
if (values.size() != 1) {
error = "Parameter keys must be unique";
} else {
createParameters.put(param, values[0]);
}
if (error.isSome()) {
cerr << "Failed to parse the '--create_parameters' flags: "
<< error->message << endl;
return EXIT_FAILURE;
}
}
}
hashmap<string, Bytes> volumes;
if (flags.volumes.isSome()) {
foreachpair (const string& name,
const vector<string>& capacities,
strings::pairs(flags.volumes.get(), ";", ":")) {
Option<Error> error;
if (strings::contains(name, stringify(os::PATH_SEPARATOR))) {
error =
"Volume name cannot contain '" + stringify(os::PATH_SEPARATOR) + "'";
} else if (capacities.size() != 1) {
error = "Volume name must be unique";
} else if (volumes.contains(name)) {
error = "Volume '" + name + "' already exists";
} else {
Try<Bytes> capacity = Bytes::parse(capacities[0]);
if (capacity.isError()) {
error = capacity.error();
} else {
volumes.put(name, capacity.get());
}
}
if (error.isSome()) {
cerr << "Failed to parse the '--volumes' flag: " << error.get() << endl;
return EXIT_FAILURE;
}
}
}
// Terminate the plugin if the endpoint socket file already exists to simulate
// an `EADDRINUSE` bind error.
const string endpointPath =
strings::remove(flags.endpoint, "unix://", strings::PREFIX);
if (os::exists(endpointPath)) {
cerr << "Failed to create endpoint '" << endpointPath << "': already exists"
<< endl;
return EXIT_FAILURE;
}
if (flags.forward.isSome()) {
CSIProxy proxy(flags.endpoint, flags.forward.get());
proxy.run();
} else {
TestCSIPlugin plugin(
flags.api_version,
flags.endpoint,
flags.work_dir,
flags.available_capacity,
createParameters,
volumes);
plugin.run();
}
}