blob: 2809847566d79da9ce85f0b3f1979d327a8936c0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <map>
#include <string>
#include <tuple>
#include <vector>
#include <mesos/csi/types.hpp>
#include <mesos/csi/v0.hpp>
#include <mesos/module/disk_profile_adaptor.hpp>
#include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
#include <process/owned.hpp>
#include <stout/duration.hpp>
#include <stout/gtest.hpp>
#include <stout/hashset.hpp>
#include <stout/path.hpp>
#include <stout/stringify.hpp>
#include <stout/try.hpp>
#include <stout/os/write.hpp>
#include "module/manager.hpp"
#include "resource_provider/storage/disk_profile_utils.hpp"
#include "resource_provider/storage/uri_disk_profile_adaptor.hpp"
#include "tests/flags.hpp"
#include "tests/mesos.hpp"
#include "tests/disk_profile_server.hpp"
#include "tests/utils.hpp"
using namespace process;
using std::map;
using std::shared_ptr;
using std::string;
using std::tuple;
using std::vector;
using google::protobuf::Map;
using mesos::resource_provider::DiskProfileMapping;
using testing::_;
using testing::DoAll;
using testing::Return;
namespace mesos {
namespace internal {
namespace tests {
constexpr char URI_DISK_PROFILE_ADAPTOR_NAME[] =
"org_apache_mesos_UriDiskProfileAdaptor";
class UriDiskProfileAdaptorTest : public MesosTest
{
public:
void SetUp() override
{
MesosTest::SetUp();
string libraryPath = getModulePath("uri_disk_profile_adaptor");
Modules::Library* library = modules.add_libraries();
library->set_name("uri_disk_profile_adaptor");
library->set_file(libraryPath);
Modules::Library::Module* module = library->add_modules();
module->set_name(URI_DISK_PROFILE_ADAPTOR_NAME);
ASSERT_SOME(modules::ModuleManager::load(modules));
}
void TearDown() override
{
foreach (const Modules::Library& library, modules.libraries()) {
foreach (const Modules::Library::Module& module, library.modules()) {
if (module.has_name()) {
ASSERT_SOME(modules::ModuleManager::unload(module.name()));
}
}
}
MesosTest::TearDown();
}
protected:
Modules modules;
};
// Exercises the disk profile map parsing method with the example found
// in the UriDiskProfileAdaptor module's help string.
TEST_F(UriDiskProfileAdaptorTest, ParseExample)
{
const string example = R"~(
{
"profile_matrix" : {
"my-profile" : {
"csi_plugin_type_selector" : {
"plugin_type" : "org.apache.mesos.csi.test"
},
"volume_capabilities" : {
"block" : {},
"access_mode" : { "mode" : "SINGLE_NODE_WRITER" }
},
"create_parameters" : {
"mesos-does-not" : "interpret-these",
"type" : "raid5",
"stripes" : "3",
"stripesize" : "64"
}
}
}
})~";
Try<DiskProfileMapping> parsed =
mesos::internal::storage::parseDiskProfileMapping(example);
ASSERT_SOME(parsed);
const string key = "my-profile";
ASSERT_EQ(1u, parsed->profile_matrix().count(key));
csi::v0::VolumeCapability capability =
parsed->profile_matrix().at(key).volume_capabilities();
ASSERT_TRUE(capability.has_block());
ASSERT_TRUE(capability.has_access_mode());
ASSERT_EQ(
csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER,
capability.access_mode().mode());
Map<string, string> parameters =
parsed->profile_matrix().at(key).create_parameters();
ASSERT_EQ(4u, parameters.size());
ASSERT_EQ(1u, parameters.count("mesos-does-not"));
ASSERT_EQ(1u, parameters.count("type"));
ASSERT_EQ(1u, parameters.count("stripes"));
ASSERT_EQ(1u, parameters.count("stripesize"));
ASSERT_EQ("interpret-these", parameters.at("mesos-does-not"));
ASSERT_EQ("raid5", parameters.at("type"));
ASSERT_EQ("3", parameters.at("stripes"));
ASSERT_EQ("64", parameters.at("stripesize"));
}
// Exercises the disk profile map parsing method with some slightly incorrect
// inputs. Each item in the array of examples should error at a different area
// of the code (and are ordered corresponding to the code as well).
TEST_F(UriDiskProfileAdaptorTest, ParseInvalids)
{
const vector<string> examples = {
"Not an object, but still JSON",
R"~({
"profile_matrix" : {
"profile" : "This is not an object"
}
})~",
// Missing one of 'resource_provider_selector' or
// 'csi_plugin_type_selector'.
R"~({
"profile_matrix" : {
"profile" : {}
}
})~",
R"~({
"profile_matrix" : {
"profile" : {
"resource_provider_selector" : "Wrong JSON type"
}
}
}
})~",
R"~({
"profile_matrix" : {
"profile" : {
"resource_provider_selector" : {
"resource_providers" : "Wrong JSON type"
}
}
}
})~",
R"~({
"profile_matrix" : {
"profile" : {
"resource_provider_selector" : {
"resource_providers" : [
{
"not-type" : "Missing required key"
}
]
}
}
}
})~",
R"~({
"profile_matrix" : {
"profile" : {
"resource_provider_selector" : {
"resource_providers" : [
{
"type" : "org.apache.mesos.rp.local.storage",
"not-name" : "Missing required key"
}
]
}
}
}
})~",
R"~({
"profile_matrix" : {
"profile" : {
"csi_plugin_type_selector" : "Wrong JSON type"
}
}
})~",
R"~({
"profile_matrix" : {
"profile" : {
"csi_plugin_type_selector" : {
"not-plugin_type" : "Missing required key",
}
}
}
})~",
// More than one selector.
R"~({
"profile_matrix" : {
"profile" : {
"resource_provider_selector" : {
"resource_providers" : [
{
"type" : "org.apache.mesos.rp.local.storage",
"name" : "test"
}
]
},
"csi_plugin_type_selector" : {
"plugin_type" : "org.apache.mesos.csi.test",
}
}
}
})~",
R"~({
"profile_matrix" : {
"profile" : {
"csi_plugin_type_selector" : {
"plugin_type" : "org.apache.mesos.csi.test",
},
"not-volume_capabilities" : "Missing required key"
}
}
})~",
R"~({
"profile_matrix" : {
"profile" : {
"csi_plugin_type_selector" : {
"plugin_type" : "org.apache.mesos.csi.test",
},
"volume_capabilities" : "Wrong JSON type"
}
}
})~",
// Missing one of 'block' or 'mount'.
R"~({
"profile_matrix" : {
"profile" : {
"csi_plugin_type_selector" : {
"plugin_type" : "org.apache.mesos.csi.test",
},
"volume_capabilities" : {}
}
}
})~",
R"~({
"profile_matrix" : {
"profile" : {
"csi_plugin_type_selector" : {
"plugin_type" : "org.apache.mesos.csi.test",
},
"volume_capabilities" : {
"mount" : {
"fs_type" : [ "This should not be an array" ]
}
}
}
}
})~",
R"~({
"profile_matrix" : {
"profile" : {
"csi_plugin_type_selector" : {
"plugin_type" : "org.apache.mesos.csi.test",
},
"volume_capabilities" : {
"block" : {},
"access_mode" : { "mode": "No-enum-of-this-name" }
}
}
}
})~",
R"~({
"profile_matrix" : {
"profile" : {
"csi_plugin_type_selector" : {
"plugin_type" : "org.apache.mesos.csi.test",
},
"volume_capabilities" : {
"mount" : {
"mount_flags" : [ "a", "b", "c" ]
},
"access_mode" : { "mode": "SINGLE_NODE_WRITER" }
},
"create_parameters" : "Wrong JSON type"
}
}
})~",
R"~({
"profile_matrix" : {
"profile" : {
"csi_plugin_type_selector" : {
"plugin_type" : "org.apache.mesos.csi.test",
},
"volume_capabilities" : {
"mount" : { "fs_type" : "abc" },
"access_mode" : { "mode": "SINGLE_NODE_READER_ONLY" }
},
"create_parameters" : {
"incorrect" : [ "JSON type of parameter" ]
}
}
}
})~",
R"~({
"profile_matrix" : {
"profile" : {
"csi_plugin_type_selector" : {
"plugin_type" : "org.apache.mesos.csi.test",
},
"volume_capabilities" : {
"block" : {},
"access_mode" : { "mode": "MULTI_NODE_READER_ONLY" }
}
},
"first profile is fine, second profile is broken" : {}
}
})~",
};
hashset<string> errors;
for (size_t i = 0; i < examples.size(); i++) {
Try<DiskProfileMapping> parsed =
mesos::internal::storage::parseDiskProfileMapping(examples[i]);
ASSERT_ERROR(parsed) << examples[i];
ASSERT_EQ(0u, errors.count(parsed.error())) << parsed.error();
errors.insert(parsed.error());
}
}
// This creates a UriDiskProfileAdaptor module configured to read from a
// file and tests the basic `watch` -> `translate` workflow which
// callers of the module are expected to follow.
TEST_F(UriDiskProfileAdaptorTest, FetchFromFile)
{
Clock::pause();
const string contents =R"~(
{
"profile_matrix" : {
"profile" : {
"resource_provider_selector" : {
"resource_providers" : [
{
"type" : "resource_provider_type",
"name" : "resource_provider_name"
}
]
},
"volume_capabilities" : {
"block" : {},
"access_mode" : { "mode": "MULTI_NODE_SINGLE_WRITER" }
}
}
}
})~";
const string profileName = "profile";
const string profileFile = path::join(sandbox.get(), "profiles.json");
const Duration pollInterval = Seconds(10);
ResourceProviderInfo resourceProviderInfo;
resourceProviderInfo.set_type("resource_provider_type");
resourceProviderInfo.set_name("resource_provider_name");
Parameters params;
Parameter* pollIntervalFlag = params.add_parameter();
pollIntervalFlag->set_key("poll_interval");
pollIntervalFlag->set_value(stringify(pollInterval));
// NOTE: We cannot use the `file://` URI to specify the file location,
// otherwise the file contents will be prematurely read. Therefore, we
// specify the absolute path of the file in the `uri` flag.
Parameter* uriFlag = params.add_parameter();
uriFlag->set_key("uri");
uriFlag->set_value(profileFile);
// Create the module before we've written anything to the file.
// This means the first poll will fail, so the module believes there
// are no profiles at the moment.
Try<DiskProfileAdaptor*> _module =
modules::ModuleManager::create<DiskProfileAdaptor>(
URI_DISK_PROFILE_ADAPTOR_NAME,
params);
ASSERT_SOME(_module);
Owned<DiskProfileAdaptor> module(_module.get());
// Start watching for updates.
// By the time this returns, we'll know that the first poll has finished
// because when the module reads from file, it does so immediately upon
// being initialized.
Future<hashset<string>> future =
module->watch(hashset<string>::EMPTY, resourceProviderInfo);
// Write the single profile to the file.
ASSERT_SOME(os::write(profileFile, contents));
// Trigger the next poll.
Clock::advance(pollInterval);
AWAIT_ASSERT_READY(future);
ASSERT_EQ(1u, future->size());
EXPECT_EQ(profileName, *(future->begin()));
// Translate the profile name into the profile mapping.
Future<DiskProfileAdaptor::ProfileInfo> mapping =
module->translate(profileName, resourceProviderInfo);
AWAIT_ASSERT_READY(mapping);
ASSERT_TRUE(mapping->capability.has_block());
ASSERT_EQ(
csi::types::VolumeCapability::AccessMode::MULTI_NODE_SINGLE_WRITER,
mapping->capability.access_mode().mode());
Clock::resume();
}
// This creates a UriDiskProfileAdaptor module configured to read from
// an HTTP URI. The HTTP server will return a different profile mapping
// between each of the calls. We expect the module to ignore the third
// call because the module does not allow profiles to be mutated. This
// is not a fatal error however, as the HTTP server can be "fixed"
// without restarting the agent.
TEST_F(UriDiskProfileAdaptorTest, FetchFromHTTP)
{
Clock::pause();
const string contents1 =
R"~(
{
"profile_matrix" : {
"profile" : {
"resource_provider_selector" : {
"resource_providers" : [
{
"type" : "resource_provider_type",
"name" : "resource_provider_name"
}
]
},
"volume_capabilities" : {
"block" : {},
"access_mode" : { "mode": "SINGLE_NODE_WRITER" }
}
}
}
}
)~";
const string contents2 =
R"~(
{
"profile_matrix" : {
"another-profile" : {
"resource_provider_selector" : {
"resource_providers" : [
{
"type" : "resource_provider_type",
"name" : "resource_provider_name"
}
]
},
"volume_capabilities" : {
"block" : {},
"access_mode" : { "mode": "MULTI_NODE_MULTI_WRITER" }
}
}
}
}
)~";
const string contents3 =
R"~(
{
"profile_matrix" : {
"profile" : {
"resource_provider_selector" : {
"resource_providers" : [
{
"type" : "resource_provider_type",
"name" : "resource_provider_name"
}
]
},
"volume_capabilities" : {
"block" : {},
"access_mode" : { "mode": "MULTI_NODE_MULTI_WRITER" }
}
}
}
}
)~";
const Duration pollInterval = Seconds(10);
ResourceProviderInfo resourceProviderInfo;
resourceProviderInfo.set_type("resource_provider_type");
resourceProviderInfo.set_name("resource_provider_name");
Future<shared_ptr<TestDiskProfileServer>> server =
TestDiskProfileServer::create();
AWAIT_READY(server);
// We need to intercept this call since the module is expected to
// ignore the result of the third call.
Future<Nothing> thirdCall;
EXPECT_CALL(*server.get()->process, profiles(_))
.WillOnce(Return(http::OK(contents1)))
.WillOnce(Return(http::OK(contents2)))
.WillOnce(DoAll(FutureSatisfy(&thirdCall), Return(http::OK(contents3))))
.WillOnce(Return(http::OK(contents1)));
Parameters params;
Parameter* pollIntervalFlag = params.add_parameter();
pollIntervalFlag->set_key("poll_interval");
pollIntervalFlag->set_value(stringify(pollInterval));
Parameter* uriFlag = params.add_parameter();
uriFlag->set_key("uri");
uriFlag->set_value(stringify(server.get()->process->url()));
Try<DiskProfileAdaptor*> _module =
modules::ModuleManager::create<DiskProfileAdaptor>(
URI_DISK_PROFILE_ADAPTOR_NAME,
params);
ASSERT_SOME(_module);
Owned<DiskProfileAdaptor> module(_module.get());
// Wait for the first HTTP poll to complete.
Future<hashset<string>> future =
module->watch(hashset<string>::EMPTY, resourceProviderInfo);
AWAIT_READY(future);
ASSERT_EQ(1u, future->size());
EXPECT_EQ("profile", *future->begin());
// Start watching for an update to the list of profiles.
future = module->watch({"profile"}, resourceProviderInfo);
// Trigger the second HTTP poll.
Clock::advance(pollInterval);
AWAIT_READY(future);
ASSERT_EQ(1u, future->size());
EXPECT_EQ("another-profile", *future->begin());
// Watching for another update to the list of profiles.
future = module->watch({"another-profile"}, resourceProviderInfo);
Future<Nothing> contentsPolled =
FUTURE_DISPATCH(_, &storage::UriDiskProfileAdaptorProcess::_poll);
// Trigger the third HTTP poll.
Clock::advance(pollInterval);
AWAIT_READY(thirdCall);
// Ensure that the polling has actually completed.
AWAIT_READY(contentsPolled);
// We don't expect the module to notify watcher(s) because the server's
// response is considered invalid (the module does not allow profiles
// to be renamed).
Clock::settle();
ASSERT_TRUE(future.isPending());
// Trigger the fourth HTTP poll.
Clock::advance(pollInterval);
// This time, the server's response includes the correct first profile.
AWAIT_READY(future);
ASSERT_EQ(1u, future->size());
EXPECT_EQ("profile", *future->begin());
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {