blob: aadebd3477f9207280e55f36f6f896da3b4fddf2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <mesos/type_utils.hpp>
#include <process/clock.hpp>
#include <process/gtest.hpp>
#include <process/gmock.hpp>
#include <stout/os.hpp>
#include <stout/json.hpp>
#include <stout/protobuf.hpp>
#include <stout/stringify.hpp>
#include <stout/os/realpath.hpp>
#include "common/http.hpp"
#include "csi/paths.hpp"
#include "internal/evolve.hpp"
#include "resource_provider/local.hpp"
#include "slave/slave.hpp"
#include "slave/containerizer/fetcher.hpp"
#include "slave/containerizer/mesos/containerizer.hpp"
#include "tests/flags.hpp"
#include "tests/mesos.hpp"
namespace http = process::http;
using std::list;
using std::string;
using std::vector;
using mesos::internal::slave::Slave;
using mesos::master::detector::MasterDetector;
using process::Clock;
using process::Future;
using process::Owned;
using process::PID;
using testing::Values;
using testing::WithParamInterface;
namespace mesos {
namespace internal {
namespace tests {
constexpr char TEST_SLRP_TYPE[] = "org.apache.mesos.rp.local.storage";
constexpr char TEST_SLRP_NAME[] = "test";
class AgentResourceProviderConfigApiTest
: public ContainerizerTest<slave::MesosContainerizer>,
public WithParamInterface<ContentType>
{
public:
void SetUp() override
{
ContainerizerTest<slave::MesosContainerizer>::SetUp();
resourceProviderConfigDir =
path::join(sandbox.get(), "resource_provider_configs");
ASSERT_SOME(os::mkdir(resourceProviderConfigDir.get()));
}
void TearDown() override
{
foreach (const string& slaveWorkDir, slaveWorkDirs) {
// Clean up CSI endpoint directories if there is any.
const string csiRootDir = slave::paths::getCsiRootDir(slaveWorkDir);
Try<list<string>> csiContainerPaths =
csi::paths::getContainerPaths(csiRootDir, "*", "*");
ASSERT_SOME(csiContainerPaths);
foreach (const string& path, csiContainerPaths.get()) {
Try<csi::paths::ContainerPath> containerPath =
csi::paths::parseContainerPath(csiRootDir, path);
ASSERT_SOME(containerPath);
Result<string> endpointDir =
os::realpath(csi::paths::getEndpointDirSymlinkPath(
csiRootDir,
containerPath->type,
containerPath->name,
containerPath->containerId));
if (endpointDir.isSome()) {
ASSERT_SOME(os::rmdir(endpointDir.get()));
}
}
}
ContainerizerTest<slave::MesosContainerizer>::TearDown();
}
master::Flags CreateMasterFlags() override
{
master::Flags flags =
ContainerizerTest<slave::MesosContainerizer>::CreateMasterFlags();
// Use a small allocation interval to speed up the test. We do this instead
// of manipulating the clock because the storage local resource provider
// relies on a running clock to wait for the CSI plugin to be ready.
flags.allocation_interval = Milliseconds(50);
return flags;
}
slave::Flags CreateSlaveFlags() override
{
slave::Flags flags =
ContainerizerTest<slave::MesosContainerizer>::CreateSlaveFlags();
flags.resource_provider_config_dir = resourceProviderConfigDir;
// Store the agent work directory for cleaning up CSI endpoint
// directories during teardown.
// NOTE: DO NOT change the work directory afterward.
slaveWorkDirs.push_back(flags.work_dir);
return flags;
}
ResourceProviderInfo createResourceProviderInfo(const string& volumes)
{
Option<ResourceProviderInfo> info;
// This extra closure is necessary in order to use `ASSERT_*`, as
// these macros require a void return type.
[&] {
// Randomize the plugin name so every created config uses its own CSI
// plugin instance. We use this to check if the config has been updated in
// some tests.
const string testCsiPluginName =
"local_" + strings::remove(id::UUID::random().toString(), "-");
const string testCsiPluginPath =
path::join(tests::flags.build_dir, "src", "test-csi-plugin");
const string testCsiPluginWorkDir =
path::join(sandbox.get(), testCsiPluginName);
ASSERT_SOME(os::mkdir(testCsiPluginWorkDir));
Try<string> resourceProviderConfig = strings::format(
R"~(
{
"type": "%s",
"name": "%s",
"default_reservations": [
{
"type": "DYNAMIC",
"role": "storage"
}
],
"storage": {
"plugin": {
"type": "org.apache.mesos.csi.test",
"name": "%s",
"containers": [
{
"services": [
"CONTROLLER_SERVICE",
"NODE_SERVICE"
],
"command": {
"shell": false,
"value": "%s",
"arguments": [
"%s",
"--available_capacity=0B",
"--volumes=%s",
"--work_dir=%s"
]
},
"resources": [
{
"name": "cpus",
"type": "SCALAR",
"scalar": {
"value": 0.1
}
},
{
"name": "mem",
"type": "SCALAR",
"scalar": {
"value": 1024
}
}
]
}
]
}
}
}
)~",
TEST_SLRP_TYPE,
TEST_SLRP_NAME,
testCsiPluginName,
testCsiPluginPath,
testCsiPluginPath,
volumes,
testCsiPluginWorkDir);
ASSERT_SOME(resourceProviderConfig);
Try<JSON::Object> json =
JSON::parse<JSON::Object>(resourceProviderConfig.get());
ASSERT_SOME(json);
Try<ResourceProviderInfo> _info =
::protobuf::parse<ResourceProviderInfo>(json.get());
ASSERT_SOME(_info);
info = _info.get();
}();
return info.get();
}
list<string> getResourceProviderConfigPaths()
{
list<string> result;
// This extra closure is necessary in order to use `ASSERT_*`, as
// these macros require a void return type.
[&] {
Try<list<string>> configFiles = os::ls(resourceProviderConfigDir.get());
ASSERT_SOME(configFiles);
foreach (const string& configFile, configFiles.get()) {
const string configPath =
path::join(resourceProviderConfigDir.get(), configFile);
// We only keep regular files to filter out the staging directory.
if (os::stat::isfile(configPath)) {
result.push_back(configPath);
}
}
}();
return result;
}
Future<http::Response> addResourceProviderConfig(
const PID<Slave>& pid,
const ContentType& contentType,
const ResourceProviderInfo& info)
{
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
agent::Call call;
call.set_type(agent::Call::ADD_RESOURCE_PROVIDER_CONFIG);
call.mutable_add_resource_provider_config()
->mutable_info()->CopyFrom(info);
return http::post(
pid,
"api/v1",
headers,
serialize(contentType, evolve(call)),
stringify(contentType));
}
Future<http::Response> updateResourceProviderConfig(
const PID<Slave>& pid,
const ContentType& contentType,
const ResourceProviderInfo& info)
{
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
agent::Call call;
call.set_type(agent::Call::UPDATE_RESOURCE_PROVIDER_CONFIG);
call.mutable_update_resource_provider_config()
->mutable_info()->CopyFrom(info);
return http::post(
pid,
"api/v1",
headers,
serialize(contentType, evolve(call)),
stringify(contentType));
}
Future<http::Response> removeResourceProviderConfig(
const PID<Slave>& pid,
const ContentType& contentType,
const string& type,
const string& name)
{
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
agent::Call call;
call.set_type(agent::Call::REMOVE_RESOURCE_PROVIDER_CONFIG);
call.mutable_remove_resource_provider_config()->set_type(type);
call.mutable_remove_resource_provider_config()->set_name(name);
return http::post(
pid,
"api/v1",
headers,
serialize(contentType, evolve(call)),
stringify(contentType));
}
protected:
vector<string> slaveWorkDirs;
Option<string> resourceProviderConfigDir;
};
// The tests are parameterized by the content type of the request.
INSTANTIATE_TEST_CASE_P(
ContentType,
AgentResourceProviderConfigApiTest,
Values(ContentType::PROTOBUF, ContentType::JSON));
// This test adds a new resource provider config on the fly.
TEST_P(AgentResourceProviderConfigApiTest, Add)
{
const ContentType contentType = GetParam();
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
// Register a framework to wait for an offer having the provider
// resource.
FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
framework.set_roles(0, "storage");
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
// We use the following filter to filter offers that do not have
// wanted resources for 365 days (the maximum).
Filters declineFilters;
declineFilters.set_refuse_seconds(Days(365).secs());
// Decline offers that contain only the agent's default resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillRepeatedly(DeclineOffers(declineFilters));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
std::bind(&Resources::hasResourceProvider, lambda::_1))))
.WillOnce(FutureArg<1>(&offers));
driver.start();
// Add a new resource provider.
ResourceProviderInfo info = createResourceProviderInfo("volume1:4GB");
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
http::OK().status,
addResourceProviderConfig(slave.get()->pid, contentType, info));
// Check that a new config file is created.
list<string> configPaths = getResourceProviderConfigPaths();
EXPECT_EQ(1u, configPaths.size());
Try<string> read = os::read(configPaths.back());
ASSERT_SOME(read);
Try<JSON::Object> json = JSON::parse<JSON::Object>(read.get());
ASSERT_SOME(json);
Try<ResourceProviderInfo> _info =
::protobuf::parse<ResourceProviderInfo>(json.get());
ASSERT_SOME(_info);
EXPECT_EQ(_info.get(), info);
// Wait for an offer having the provider resource.
AWAIT_READY(offers);
}
// This test checks that adding a resource provider config that is identical to
// an existing one is allowed due to idempotency.
TEST_P(AgentResourceProviderConfigApiTest, IdempotentAdd)
{
const ContentType contentType = GetParam();
Clock::pause();
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
// Generate a pre-existing config.
ResourceProviderInfo info = createResourceProviderInfo("volume1:4GB");
ASSERT_SOME(os::write(
path::join(resourceProviderConfigDir.get(), "test.json"),
stringify(JSON::protobuf(info))));
slave::Flags slaveFlags = CreateSlaveFlags();
// Since the local resource provider daemon is started after the agent is
// registered, it is guaranteed that the slave will send two
// `UpdateSlaveMessage`s, where the latter one contains resources from the
// storage local resource provider.
// NOTE: The order of the two `FUTURE_PROTOBUF`s are reversed because GMock
// will search the expectations in reverse order.
Future<UpdateSlaveMessage> updateSlave2 =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
Future<UpdateSlaveMessage> updateSlave1 =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
// Advance the clock to trigger agent registration and prevent retry.
Clock::advance(slaveFlags.registration_backoff_factor);
AWAIT_READY(updateSlave1);
// NOTE: We need to resume the clock so that the resource provider can
// periodically check if the CSI endpoint socket has been created by the
// plugin container, which runs in another Linux process.
Clock::resume();
AWAIT_READY(updateSlave2);
ASSERT_TRUE(updateSlave2->has_resource_providers());
ASSERT_EQ(1, updateSlave2->resource_providers().providers_size());
Clock::pause();
// No update message should be triggered by an idempotent add.
EXPECT_NO_FUTURE_PROTOBUFS(UpdateSlaveMessage(), _, _);
// Add a resource provider config that is identical to the existing one.
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
http::OK().status,
addResourceProviderConfig(slave.get()->pid, contentType, info));
Clock::settle();
}
// This test checks that adding a resource provider config that already
// exists is rejected.
TEST_P(AgentResourceProviderConfigApiTest, AddConflict)
{
const ContentType contentType = GetParam();
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
// Generate a pre-existing config.
const string configPath =
path::join(resourceProviderConfigDir.get(), "test.json");
ResourceProviderInfo oldInfo = createResourceProviderInfo("volume1:4GB");
ASSERT_SOME(os::write(configPath, stringify(JSON::protobuf(oldInfo))));
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
ResourceProviderInfo newInfo = createResourceProviderInfo("volume1:2GB");
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
http::Conflict().status,
addResourceProviderConfig(slave.get()->pid, contentType, newInfo));
// Check that no new config is created, and the existing one is not
// overwritten.
list<string> configPaths = getResourceProviderConfigPaths();
EXPECT_EQ(1u, configPaths.size());
EXPECT_EQ(configPath, configPaths.back());
Try<string> read = os::read(configPath);
ASSERT_SOME(read);
Try<JSON::Object> json = JSON::parse<JSON::Object>(read.get());
ASSERT_SOME(json);
Try<ResourceProviderInfo> _info =
::protobuf::parse<ResourceProviderInfo>(json.get());
ASSERT_SOME(_info);
EXPECT_EQ(_info.get(), oldInfo);
}
// This test checks that adding an invalid resource provider config is rejected.
TEST_P(AgentResourceProviderConfigApiTest, AddInvalid)
{
const ContentType contentType = GetParam();
master::Flags masterFlags = CreateMasterFlags();
masterFlags.allocation_interval = Milliseconds(50);
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
// Generate an invalid config by clearing the `storage` field, which is
// required by the storage local resource provider.
ResourceProviderInfo info = createResourceProviderInfo("volume1:4GB");
info.clear_storage();
ASSERT_SOME(LocalResourceProvider::validate(info));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
http::BadRequest().status,
updateResourceProviderConfig(slave.get()->pid, contentType, info));
// Check that no new config is created.
list<string> configPaths = getResourceProviderConfigPaths();
EXPECT_TRUE(configPaths.empty());
}
// This test updates an existing resource provider config on the fly.
TEST_P(AgentResourceProviderConfigApiTest, Update)
{
const ContentType contentType = GetParam();
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
// Generate a pre-existing config.
const string volumes = "volume1:4GB";
const string configPath =
path::join(resourceProviderConfigDir.get(), "test.json");
ASSERT_SOME(os::write(
configPath,
stringify(JSON::protobuf(createResourceProviderInfo(volumes)))));
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
// Register a framework to wait for an offer having the provider
// resource.
FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
framework.set_roles(0, "storage");
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
// We use the following filter to filter offers that do not have
// wanted resources for 365 days (the maximum).
Filters declineFilters;
declineFilters.set_refuse_seconds(Days(365).secs());
// Decline offers that contain only the agent's default resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillRepeatedly(DeclineOffers(declineFilters));
Future<vector<Offer>> oldOffers;
EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
&Resources::hasResourceProvider)))
.WillOnce(FutureArg<1>(&oldOffers));
driver.start();
// Wait for an offer having the old provider resource.
AWAIT_READY(oldOffers);
ASSERT_EQ(1u, oldOffers->size());
Resource oldResource = *Resources(oldOffers->at(0).resources())
.filter(&Resources::hasResourceProvider)
.begin();
ASSERT_TRUE(oldResource.has_disk());
ASSERT_TRUE(oldResource.disk().has_source());
ASSERT_TRUE(oldResource.disk().source().has_vendor());
Future<OfferID> rescinded;
EXPECT_CALL(sched, offerRescinded(&driver, oldOffers->at(0).id()))
.WillOnce(FutureArg<1>(&rescinded));
Future<vector<Offer>> newOffers;
EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
std::bind(&Resources::hasResourceProvider, lambda::_1))))
.WillOnce(FutureArg<1>(&newOffers));
// Create a new config that serves the same volumes with a different vendor.
ResourceProviderInfo info = createResourceProviderInfo(volumes);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
http::OK().status,
updateResourceProviderConfig(slave.get()->pid, contentType, info));
// Check that no new config is created, and the existing one is overwritten.
list<string> configPaths = getResourceProviderConfigPaths();
EXPECT_EQ(1u, configPaths.size());
EXPECT_EQ(configPath, configPaths.back());
Try<string> read = os::read(configPath);
ASSERT_SOME(read);
Try<JSON::Object> json = JSON::parse<JSON::Object>(read.get());
ASSERT_SOME(json);
Try<ResourceProviderInfo> _info =
::protobuf::parse<ResourceProviderInfo>(json.get());
ASSERT_SOME(_info);
EXPECT_EQ(_info.get(), info);
// Wait for the old offer to be rescinded.
AWAIT_READY(rescinded);
// Wait for an offer having the new provider resource.
AWAIT_READY(newOffers);
ASSERT_EQ(1u, newOffers->size());
Resource newResource = *Resources(newOffers->at(0).resources())
.filter(&Resources::hasResourceProvider)
.begin();
ASSERT_TRUE(newResource.has_disk());
ASSERT_TRUE(newResource.disk().has_source());
ASSERT_TRUE(newResource.disk().source().has_vendor());
// The resource from the new resource provider has the same provider ID but a
// different vendor as that from the old one.
EXPECT_EQ(oldResource.provider_id(), newResource.provider_id());
EXPECT_NE(
oldResource.disk().source().vendor(),
newResource.disk().source().vendor());
}
// This test checks that updating an existing resource provider config with an
// identical one will not relaunch the resource provider due to idempotency.
TEST_P(AgentResourceProviderConfigApiTest, IdempotentUpdate)
{
const ContentType contentType = GetParam();
Clock::pause();
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
// Generate a pre-existing config.
ResourceProviderInfo info = createResourceProviderInfo("volume1:4GB");
ASSERT_SOME(os::write(
path::join(resourceProviderConfigDir.get(), "test.json"),
stringify(JSON::protobuf(info))));
slave::Flags slaveFlags = CreateSlaveFlags();
// Since the local resource provider daemon is started after the agent is
// registered, it is guaranteed that the slave will send two
// `UpdateSlaveMessage`s, where the latter one contains resources from the
// storage local resource provider.
// NOTE: The order of the two `FUTURE_PROTOBUF`s are reversed because GMock
// will search the expectations in reverse order.
Future<UpdateSlaveMessage> updateSlave2 =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
Future<UpdateSlaveMessage> updateSlave1 =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
// Advance the clock to trigger agent registration and prevent retry.
Clock::advance(slaveFlags.registration_backoff_factor);
AWAIT_READY(updateSlave1);
// NOTE: We need to resume the clock so that the resource provider can
// periodically check if the CSI endpoint socket has been created by the
// plugin container, which runs in another Linux process.
Clock::resume();
AWAIT_READY(updateSlave2);
ASSERT_TRUE(updateSlave2->has_resource_providers());
ASSERT_EQ(1, updateSlave2->resource_providers().providers_size());
Clock::pause();
// No update message should be triggered by an idempotent update.
EXPECT_NO_FUTURE_PROTOBUFS(UpdateSlaveMessage(), _, _);
// Update the resource provider with an identical config.
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
http::OK().status,
updateResourceProviderConfig(slave.get()->pid, contentType, info));
Clock::settle();
}
// This test checks that updating a nonexistent resource provider config is
// rejected.
TEST_P(AgentResourceProviderConfigApiTest, UpdateConflict)
{
const ContentType contentType = GetParam();
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
ResourceProviderInfo info = createResourceProviderInfo("volume1:4GB");
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
http::Conflict().status,
updateResourceProviderConfig(slave.get()->pid, contentType, info));
// Check that no new config is created.
list<string> configPaths = getResourceProviderConfigPaths();
EXPECT_TRUE(configPaths.empty());
}
// This test checks that updating with an invalid resource provider config is
// rejected.
TEST_P(AgentResourceProviderConfigApiTest, UpdateInvalid)
{
const ContentType contentType = GetParam();
master::Flags masterFlags = CreateMasterFlags();
masterFlags.allocation_interval = Milliseconds(50);
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
// Generate a pre-existing config.
const string configPath =
path::join(resourceProviderConfigDir.get(), "test.json");
ResourceProviderInfo oldInfo = createResourceProviderInfo("volume1:4GB");
ASSERT_SOME(os::write(configPath, stringify(JSON::protobuf(oldInfo))));
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
// Register a framework to wait for an offer having the provider resource.
FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
framework.set_roles(0, "storage");
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
// We use the following filter to filter offers that do not have
// wanted resources for 365 days (the maximum).
Filters declineFilters;
declineFilters.set_refuse_seconds(Days(365).secs());
// Decline offers that contain only the agent's default resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillRepeatedly(DeclineOffers(declineFilters));
Future<vector<Offer>> oldOffers;
EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
std::bind(&Resources::hasResourceProvider, lambda::_1))))
.WillOnce(FutureArg<1>(&oldOffers));
driver.start();
// Wait for an offer having the old provider resource.
AWAIT_READY(oldOffers);
ASSERT_FALSE(oldOffers->empty());
// Generate an invalid config by clearing the `storage` field, which is
// required by the storage local resource provider.
ResourceProviderInfo newInfo = oldInfo;
newInfo.clear_storage();
ASSERT_SOME(LocalResourceProvider::validate(newInfo));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
http::BadRequest().status,
updateResourceProviderConfig(slave.get()->pid, contentType, newInfo));
// Check that no new config is created, and the existing one is not
// overwritten.
list<string> configPaths = getResourceProviderConfigPaths();
EXPECT_EQ(1u, configPaths.size());
EXPECT_EQ(configPath, configPaths.back());
Try<string> read = os::read(configPath);
ASSERT_SOME(read);
Try<JSON::Object> json = JSON::parse<JSON::Object>(read.get());
ASSERT_SOME(json);
Try<ResourceProviderInfo> _info =
::protobuf::parse<ResourceProviderInfo>(json.get());
ASSERT_SOME(_info);
EXPECT_EQ(_info.get(), oldInfo);
}
// This test removes an existing resource provider config on the fly.
TEST_P(AgentResourceProviderConfigApiTest, Remove)
{
const ContentType contentType = GetParam();
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
// Generate a pre-existing config.
const string configPath =
path::join(resourceProviderConfigDir.get(), "test.json");
ResourceProviderInfo info = createResourceProviderInfo("volume1:4GB");
ASSERT_SOME(os::write(configPath, stringify(JSON::protobuf(info))));
slave::Flags slaveFlags = CreateSlaveFlags();
slave::Fetcher fetcher(slaveFlags);
Try<slave::MesosContainerizer*> _containerizer =
slave::MesosContainerizer::create(slaveFlags, false, &fetcher);
ASSERT_SOME(_containerizer);
Owned<slave::MesosContainerizer> containerizer(_containerizer.get());
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
Try<Owned<cluster::Slave>> slave =
StartSlave(detector.get(), containerizer.get(), slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
// Register a framework to wait for an offer having the provider
// resource.
FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
framework.set_roles(0, "storage");
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
EXPECT_CALL(sched, registered(&driver, _, _));
// We use the following filter to filter offers that do not have
// wanted resources for 365 days (the maximum).
Filters declineFilters;
declineFilters.set_refuse_seconds(Days(365).secs());
// Decline offers that contain only the agent's default resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillRepeatedly(DeclineOffers(declineFilters));
Future<vector<Offer>> oldOffers;
EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
std::bind(&Resources::hasResourceProvider, lambda::_1))))
.WillOnce(FutureArg<1>(&oldOffers));
driver.start();
// Wait for an offer having the old provider resource.
AWAIT_READY(oldOffers);
ASSERT_FALSE(oldOffers->empty());
Future<hashset<ContainerID>> pluginContainers = containerizer->containers();
// Check that there is one plugin container running.
AWAIT_READY(pluginContainers);
ASSERT_EQ(1u, pluginContainers->size());
Future<Option<mesos::slave::ContainerTermination>> pluginTermination =
containerizer->wait(*pluginContainers->begin());
Future<OfferID> rescinded;
EXPECT_CALL(sched, offerRescinded(&driver, oldOffers->at(0).id()))
.WillOnce(FutureArg<1>(&rescinded));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
http::OK().status,
removeResourceProviderConfig(
slave.get()->pid, contentType, info.type(), info.name()));
// Check that the existing config is removed.
EXPECT_FALSE(os::exists(configPath));
// Wait for the old offer to be rescinded.
AWAIT_READY(rescinded);
// Check that the plugin container has been destroyed.
AWAIT_READY(pluginTermination);
}
// This test checks that removing a nonexistent resource provider config is
// allowed due to idempotency.
TEST_P(AgentResourceProviderConfigApiTest, IdempotentRemove)
{
const ContentType contentType = GetParam();
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
ResourceProviderInfo info = createResourceProviderInfo("volume1:4GB");
AWAIT_EXPECT_RESPONSE_STATUS_EQ(
http::OK().status,
removeResourceProviderConfig(
slave.get()->pid, contentType, info.type(), info.name()));
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {