blob: 82dae64cd20e17c9ac8755eaf3764a30d0727509 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "tests/mock_csi_plugin.hpp"
#include <algorithm>
#include <stout/bytes.hpp>
using std::string;
using std::unique_ptr;
using grpc::ChannelArguments;
using grpc::InsecureServerCredentials;
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using process::grpc::client::Connection;
using testing::_;
using testing::A;
using testing::Invoke;
using testing::Return;
namespace mesos {
namespace internal {
namespace tests {
MockCSIPlugin::MockCSIPlugin()
{
EXPECT_CALL(*this, GetPluginInfo(_, _, A<csi::v0::GetPluginInfoResponse*>()))
.WillRepeatedly(Return(Status::OK));
// Enable all plugin service capabilities by default for testing with the test
// CSI plugin in forwarding mode.
EXPECT_CALL(*this, GetPluginCapabilities(
_, _, A<csi::v0::GetPluginCapabilitiesResponse*>()))
.WillRepeatedly(Invoke([](
ServerContext* context,
const csi::v0::GetPluginCapabilitiesRequest* request,
csi::v0::GetPluginCapabilitiesResponse* response) {
response->add_capabilities()->mutable_service()->set_type(
csi::v0::PluginCapability::Service::CONTROLLER_SERVICE);
return Status::OK;
}));
EXPECT_CALL(*this, Probe(_, _, A<csi::v0::ProbeResponse*>()))
.WillRepeatedly(Return(Status::OK));
// Return a success by default for testing with the test CSI plugin in
// forwarding mode.
EXPECT_CALL(*this, CreateVolume(_, _, A<csi::v0::CreateVolumeResponse*>()))
.WillRepeatedly(Invoke([](
ServerContext* context,
const csi::v0::CreateVolumeRequest* request,
csi::v0::CreateVolumeResponse* response) {
response->mutable_volume()->set_capacity_bytes(std::max(
request->capacity_range().required_bytes(),
request->capacity_range().limit_bytes()));
response->mutable_volume()->set_id(request->name());
return Status::OK;
}));
EXPECT_CALL(*this, DeleteVolume(_, _, A<csi::v0::DeleteVolumeResponse*>()))
.WillRepeatedly(Return(Status::OK));
EXPECT_CALL(*this, ControllerPublishVolume(
_, _, A<csi::v0::ControllerPublishVolumeResponse*>()))
.WillRepeatedly(Return(Status::OK));
EXPECT_CALL(*this, ControllerUnpublishVolume(
_, _, A<csi::v0::ControllerUnpublishVolumeResponse*>()))
.WillRepeatedly(Return(Status::OK));
EXPECT_CALL(*this, ValidateVolumeCapabilities(
_, _, A<csi::v0::ValidateVolumeCapabilitiesResponse*>()))
.WillRepeatedly(Return(Status::OK));
EXPECT_CALL(*this, ListVolumes(_, _, A<csi::v0::ListVolumesResponse*>()))
.WillRepeatedly(Return(Status::OK));
// Return an arbitrary available capacity by default for testing with the test
// CSI plugin in forwarding mode.
EXPECT_CALL(*this, GetCapacity(_, _, A<csi::v0::GetCapacityResponse*>()))
.WillRepeatedly(Invoke([](
ServerContext* context,
const csi::v0::GetCapacityRequest* request,
csi::v0::GetCapacityResponse* response) {
response->set_available_capacity(Gigabytes(4).bytes());
return Status::OK;
}));
// Enable all controller RPC capabilities by default for testing with the test
// CSI plugin in forwarding mode.
EXPECT_CALL(*this, ControllerGetCapabilities(
_, _, A<csi::v0::ControllerGetCapabilitiesResponse*>()))
.WillRepeatedly(Invoke([](
ServerContext* context,
const csi::v0::ControllerGetCapabilitiesRequest* request,
csi::v0::ControllerGetCapabilitiesResponse* response) {
response->add_capabilities()->mutable_rpc()->set_type(
csi::v0::ControllerServiceCapability::RPC::CREATE_DELETE_VOLUME);
response->add_capabilities()->mutable_rpc()->set_type(
csi::v0::ControllerServiceCapability::RPC::PUBLISH_UNPUBLISH_VOLUME);
response->add_capabilities()->mutable_rpc()->set_type(
csi::v0::ControllerServiceCapability::RPC::LIST_VOLUMES);
response->add_capabilities()->mutable_rpc()->set_type(
csi::v0::ControllerServiceCapability::RPC::GET_CAPACITY);
return Status::OK;
}));
EXPECT_CALL(*this, NodeStageVolume(
_, _, A<csi::v0::NodeStageVolumeResponse*>()))
.WillRepeatedly(Return(Status::OK));
EXPECT_CALL(*this, NodeUnstageVolume(
_, _, A<csi::v0::NodeUnstageVolumeResponse*>()))
.WillRepeatedly(Return(Status::OK));
EXPECT_CALL(*this, NodePublishVolume(
_, _, A<csi::v0::NodePublishVolumeResponse*>()))
.WillRepeatedly(Return(Status::OK));
EXPECT_CALL(*this, NodeUnpublishVolume(
_, _, A<csi::v0::NodeUnpublishVolumeResponse*>()))
.WillRepeatedly(Return(Status::OK));
EXPECT_CALL(*this, NodeGetId(_, _, A<csi::v0::NodeGetIdResponse*>()))
.WillRepeatedly(Return(Status::OK));
// Enable all node RPC capabilities by default for testing with the test CSI
// plugin in forwarding mode.
EXPECT_CALL(*this, NodeGetCapabilities(
_, _, A<csi::v0::NodeGetCapabilitiesResponse*>()))
.WillRepeatedly(Invoke([](
ServerContext* context,
const csi::v0::NodeGetCapabilitiesRequest* request,
csi::v0::NodeGetCapabilitiesResponse* response) {
response->add_capabilities()->mutable_rpc()->set_type(
csi::v0::NodeServiceCapability::RPC::STAGE_UNSTAGE_VOLUME);
return Status::OK;
}));
EXPECT_CALL(*this, GetPluginInfo(_, _, A<csi::v1::GetPluginInfoResponse*>()))
.WillRepeatedly(Return(Status::OK));
// Enable all plugin service capabilities by default for testing with the test
// CSI plugin in forwarding mode.
EXPECT_CALL(*this, GetPluginCapabilities(
_, _, A<csi::v1::GetPluginCapabilitiesResponse*>()))
.WillRepeatedly(Invoke([](
ServerContext* context,
const csi::v1::GetPluginCapabilitiesRequest* request,
csi::v1::GetPluginCapabilitiesResponse* response) {
response->add_capabilities()->mutable_service()->set_type(
csi::v1::PluginCapability::Service::CONTROLLER_SERVICE);
response->add_capabilities()->mutable_service()->set_type(
csi::v1::PluginCapability::Service::VOLUME_ACCESSIBILITY_CONSTRAINTS);
return Status::OK;
}));
EXPECT_CALL(*this, Probe(_, _, A<csi::v1::ProbeResponse*>()))
.WillRepeatedly(Return(Status::OK));
// Return a success by default for testing with the test CSI plugin in
// forwarding mode.
EXPECT_CALL(*this, CreateVolume(_, _, A<csi::v1::CreateVolumeResponse*>()))
.WillRepeatedly(Invoke([](
ServerContext* context,
const csi::v1::CreateVolumeRequest* request,
csi::v1::CreateVolumeResponse* response) {
response->mutable_volume()->set_capacity_bytes(std::max(
request->capacity_range().required_bytes(),
request->capacity_range().limit_bytes()));
response->mutable_volume()->set_volume_id(request->name());
return Status::OK;
}));
EXPECT_CALL(*this, DeleteVolume(_, _, A<csi::v1::DeleteVolumeResponse*>()))
.WillRepeatedly(Return(Status::OK));
EXPECT_CALL(*this, ControllerPublishVolume(
_, _, A<csi::v1::ControllerPublishVolumeResponse*>()))
.WillRepeatedly(Return(Status::OK));
EXPECT_CALL(*this, ControllerUnpublishVolume(
_, _, A<csi::v1::ControllerUnpublishVolumeResponse*>()))
.WillRepeatedly(Return(Status::OK));
EXPECT_CALL(*this, ValidateVolumeCapabilities(
_, _, A<csi::v1::ValidateVolumeCapabilitiesResponse*>()))
.WillRepeatedly(Return(Status::OK));
EXPECT_CALL(*this, ListVolumes(_, _, A<csi::v1::ListVolumesResponse*>()))
.WillRepeatedly(Return(Status::OK));
// Return an arbitrary available capacity by default for testing with the test
// CSI plugin in forwarding mode.
EXPECT_CALL(*this, GetCapacity(_, _, A<csi::v1::GetCapacityResponse*>()))
.WillRepeatedly(Invoke([](
ServerContext* context,
const csi::v1::GetCapacityRequest* request,
csi::v1::GetCapacityResponse* response) {
response->set_available_capacity(Gigabytes(4).bytes());
return Status::OK;
}));
// Enable all controller RPC capabilities by default for testing with the test
// CSI plugin in forwarding mode.
EXPECT_CALL(*this, ControllerGetCapabilities(
_, _, A<csi::v1::ControllerGetCapabilitiesResponse*>()))
.WillRepeatedly(Invoke([](
ServerContext* context,
const csi::v1::ControllerGetCapabilitiesRequest* request,
csi::v1::ControllerGetCapabilitiesResponse* response) {
response->add_capabilities()->mutable_rpc()->set_type(
csi::v1::ControllerServiceCapability::RPC::CREATE_DELETE_VOLUME);
response->add_capabilities()->mutable_rpc()->set_type(
csi::v1::ControllerServiceCapability::RPC::PUBLISH_UNPUBLISH_VOLUME);
response->add_capabilities()->mutable_rpc()->set_type(
csi::v1::ControllerServiceCapability::RPC::LIST_VOLUMES);
response->add_capabilities()->mutable_rpc()->set_type(
csi::v1::ControllerServiceCapability::RPC::GET_CAPACITY);
response->add_capabilities()->mutable_rpc()->set_type(
csi::v1::ControllerServiceCapability::RPC::CREATE_DELETE_SNAPSHOT);
response->add_capabilities()->mutable_rpc()->set_type(
csi::v1::ControllerServiceCapability::RPC::LIST_SNAPSHOTS);
response->add_capabilities()->mutable_rpc()->set_type(
csi::v1::ControllerServiceCapability::RPC::CLONE_VOLUME);
response->add_capabilities()->mutable_rpc()->set_type(
csi::v1::ControllerServiceCapability::RPC::PUBLISH_READONLY);
response->add_capabilities()->mutable_rpc()->set_type(
csi::v1::ControllerServiceCapability::RPC::EXPAND_VOLUME);
return Status::OK;
}));
EXPECT_CALL(*this, CreateSnapshot(
_, _, A<csi::v1::CreateSnapshotResponse*>()))
.WillRepeatedly(Return(Status::OK));
EXPECT_CALL(*this, DeleteSnapshot(
_, _, A<csi::v1::DeleteSnapshotResponse*>()))
.WillRepeatedly(Return(Status::OK));
EXPECT_CALL(*this, ListSnapshots(_, _, A<csi::v1::ListSnapshotsResponse*>()))
.WillRepeatedly(Return(Status::OK));
EXPECT_CALL(*this, ControllerExpandVolume(
_, _, A<csi::v1::ControllerExpandVolumeResponse*>()))
.WillRepeatedly(Return(Status::OK));
EXPECT_CALL(*this, NodeStageVolume(
_, _, A<csi::v1::NodeStageVolumeResponse*>()))
.WillRepeatedly(Return(Status::OK));
EXPECT_CALL(*this, NodeUnstageVolume(
_, _, A<csi::v1::NodeUnstageVolumeResponse*>()))
.WillRepeatedly(Return(Status::OK));
EXPECT_CALL(*this, NodePublishVolume(
_, _, A<csi::v1::NodePublishVolumeResponse*>()))
.WillRepeatedly(Return(Status::OK));
EXPECT_CALL(*this, NodeUnpublishVolume(
_, _, A<csi::v1::NodeUnpublishVolumeResponse*>()))
.WillRepeatedly(Return(Status::OK));
EXPECT_CALL(*this, NodeGetVolumeStats(
_, _, A<csi::v1::NodeGetVolumeStatsResponse*>()))
.WillRepeatedly(Return(Status::OK));
EXPECT_CALL(*this, NodeExpandVolume(
_, _, A<csi::v1::NodeExpandVolumeResponse*>()))
.WillRepeatedly(Return(Status::OK));
// Enable all node RPC capabilities by default for testing with the test CSI
// plugin in forwarding mode.
EXPECT_CALL(*this, NodeGetCapabilities(
_, _, A<csi::v1::NodeGetCapabilitiesResponse*>()))
.WillRepeatedly(Invoke([](
ServerContext* context,
const csi::v1::NodeGetCapabilitiesRequest* request,
csi::v1::NodeGetCapabilitiesResponse* response) {
response->add_capabilities()->mutable_rpc()->set_type(
csi::v1::NodeServiceCapability::RPC::STAGE_UNSTAGE_VOLUME);
response->add_capabilities()->mutable_rpc()->set_type(
csi::v1::NodeServiceCapability::RPC::GET_VOLUME_STATS);
response->add_capabilities()->mutable_rpc()->set_type(
csi::v1::NodeServiceCapability::RPC::EXPAND_VOLUME);
return Status::OK;
}));
EXPECT_CALL(*this, NodeGetInfo(_, _, A<csi::v1::NodeGetInfoResponse*>()))
.WillRepeatedly(Return(Status::OK));
}
Try<Connection> MockCSIPlugin::startup(const Option<string>& address)
{
ServerBuilder builder;
if (address.isSome()) {
builder.AddListeningPort(address.get(), InsecureServerCredentials());
}
builder.RegisterService(static_cast<csi::v0::Identity::Service*>(this));
builder.RegisterService(static_cast<csi::v0::Controller::Service*>(this));
builder.RegisterService(static_cast<csi::v0::Node::Service*>(this));
builder.RegisterService(static_cast<csi::v1::Identity::Service*>(this));
builder.RegisterService(static_cast<csi::v1::Controller::Service*>(this));
builder.RegisterService(static_cast<csi::v1::Node::Service*>(this));
server = builder.BuildAndStart();
if (!server) {
return Error("Unable to start a mock CSI plugin.");
}
return address.isSome()
? Connection(address.get())
: Connection(server->InProcessChannel(ChannelArguments()));
}
Try<Nothing> MockCSIPlugin::shutdown()
{
server->Shutdown();
server->Wait();
return Nothing();
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {