blob: ac16fe744ec2a8d4b4af005b91a5560f0b2f3ef3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <mesos/mesos.hpp>
#include <process/async.hpp>
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/http.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include "tests/mesos.hpp"
using mesos::authorization::VIEW_EXECUTOR;
using mesos::authorization::VIEW_FLAGS;
using mesos::authorization::VIEW_FRAMEWORK;
using mesos::authorization::VIEW_ROLE;
using mesos::authorization::VIEW_TASK;
using mesos::internal::slave::Containerizer;
using mesos::internal::slave::Fetcher;
using mesos::internal::slave::MesosContainerizer;
using mesos::master::detector::MasterDetector;
using process::async;
using process::Clock;
using process::delay;
using process::Future;
using process::Message;
using process::Owned;
using process::PID;
using process::Promise;
using process::Time;
using process::http::Response;
using process::http::Request;
using process::http::Headers;
using testing::SaveArg;
// The tests in this file are designed to verify that the caching
// of read-only requests inside a Mesos master is implemented correctly,
// i.e. cacheable requests are cached and non-cacheable requests will
// return different responses.
namespace mesos {
namespace internal {
namespace tests {
class BlockingAuthorizer;
class MasterLoadTest : public MesosTest {
protected:
// Describes a given HTTP request.
struct RequestDescriptor {
std::string endpoint;
std::string principal;
std::string query;
process::http::Headers headers;
bool operator<(const RequestDescriptor& other) const;
};
// Prepare a mock cluster with 1 master, 1 agent and 1 framework,
// with the given authorizer being wrapped in a `BlockingAuthorizer`.
void prepareCluster(Authorizer* authorizer);
// This function launches a fixed number of equivalent requests per passed
// request descriptor, while manipulating the master in order to
// ensure all requests will appear consecutively in the master queue.
// The returned map associates each response with the descriptor it was
// created from.
std::multimap<RequestDescriptor, Future<Response>>
launchSimultaneousRequests(
const std::vector<RequestDescriptor>& descriptors);
// The "mock cluster" created by `prepareCluster()`. These are `protected`
// so that the test body can access them if required.
Owned<BlockingAuthorizer> authorizer_;
Owned<cluster::Master> master_;
Owned<MasterDetector> detector_;
Owned<MockScheduler> scheduler_;
Owned<TestingMesosSchedulerDriver> driver_;
Owned<cluster::Slave> slave_;
FrameworkID frameworkId_;
};
// This authorizer will not satisfy any futures from `getObjectApprover()`
// until it is told to, presumably from the test body.
//
// It effectively acts as a giant gate for certain requests.
class BlockingAuthorizerProcess
: public process::Process<BlockingAuthorizerProcess>
{
public:
BlockingAuthorizerProcess(Authorizer* underlying)
: ProcessBase(process::ID::generate("blocking-authorizer")),
underlying_(underlying),
blocked_(true) {}
Future<bool> authorized(const authorization::Request& request)
{
return underlying_->authorized(request);
}
Future<Owned<ObjectApprover>> getObjectApprover(
const Option<authorization::Subject>& subject,
const authorization::Action& action)
{
Future<Owned<ObjectApprover>> future =
underlying_->getObjectApprover(subject, action);
if (!blocked_) {
return future;
}
// The future is linked to the returned promise in `unleash()`.
futures_.push(future);
promises_.emplace();
return promises_.back().future();
}
Future<size_t> pending()
{
return promises_.size();
}
// Satisfies all future and prior calls made to `getObjectApprover`.
Future<Nothing> unleash()
{
CHECK_EQ(promises_.size(), futures_.size());
while (!promises_.empty()) {
promises_.front().associate(futures_.front());
futures_.pop();
promises_.pop();
}
blocked_ = false;
return Nothing();
}
private:
Authorizer* underlying_;
std::queue<Future<Owned<ObjectApprover>>> futures_;
std::queue<Promise<Owned<ObjectApprover>>> promises_;
bool blocked_;
};
class BlockingAuthorizer : public Authorizer
{
public:
BlockingAuthorizer(Authorizer* underlying)
: process_(new BlockingAuthorizerProcess(underlying))
{
process::spawn(process_.get());
}
~BlockingAuthorizer() override
{
process::terminate(process_.get());
process::wait(process_.get());
}
Future<bool> authorized(const authorization::Request& request) override
{
return process::dispatch(
process_.get(),
&BlockingAuthorizerProcess::authorized,
request);
}
Future<Owned<ObjectApprover>> getObjectApprover(
const Option<authorization::Subject>& subject,
const authorization::Action& action) override
{
return process::dispatch(
process_.get(),
&BlockingAuthorizerProcess::getObjectApprover,
subject,
action);
}
Future<size_t> pending()
{
return process::dispatch(
process_.get(),
&BlockingAuthorizerProcess::pending);
}
Future<Nothing> unleash()
{
return process::dispatch(
process_.get(),
&BlockingAuthorizerProcess::unleash);
}
private:
Owned<BlockingAuthorizerProcess> process_;
};
void MasterLoadTest::prepareCluster(Authorizer* authorizer)
{
// Start a master.
authorizer_.reset(new BlockingAuthorizer(authorizer));
master::Flags masterFlags = CreateMasterFlags();
Try<Owned<cluster::Master>> master = StartMaster(
authorizer_.get(), masterFlags);
ASSERT_SOME(master);
master_ = master.get();
detector_ = master_->createDetector();
Future<FrameworkRegisteredMessage> frameworkRegisteredMessage =
FUTURE_PROTOBUF(FrameworkRegisteredMessage(), _, _);
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
// Start a framework.
scheduler_.reset(new MockScheduler());
driver_.reset(new TestingMesosSchedulerDriver(
scheduler_.get(), detector_.get()));
EXPECT_CALL(*scheduler_, registered(driver_.get(), _, _))
.WillOnce(SaveArg<1>(&frameworkId_));
driver_->start();
AWAIT_READY(frameworkRegisteredMessage);
// Start an agent.
slave::Flags slaveFlags = CreateSlaveFlags();
Try<Owned<cluster::Slave>> slave = StartSlave(detector_.get(), slaveFlags);
ASSERT_SOME(slave);
slave_ = slave.get();
AWAIT_READY(slaveRegisteredMessage);
}
std::multimap<MasterLoadTest::RequestDescriptor, Future<Response>>
MasterLoadTest::launchSimultaneousRequests(
const std::vector<RequestDescriptor>& descriptors)
{
// NOTE: On Mac, the default number of open files (and thus tcp connections)
// is limited to 256 by default, so this number is tweaked to stay slightly
// lower than that at 40*5==200 connections for the most demanding test.
const size_t REQUESTS_PER_DESCRIPTOR = 40;
const size_t totalRequests = REQUESTS_PER_DESCRIPTOR * descriptors.size();
std::multimap<RequestDescriptor, Future<Response>> requests;
// Need this wrapper since `AWAIT_READY()` expects a `void` return type.
[&] {
// Send out all http requests based on the specifications
// found in `descriptors` and store the result in `requests`.
foreach (const RequestDescriptor& descriptor, descriptors) {
for (size_t i=0; i < REQUESTS_PER_DESCRIPTOR; ++i) {
Future<Response> response = process::http::get(
master_->pid,
descriptor.endpoint,
descriptor.query,
descriptor.headers);
requests.emplace(descriptor, response);
}
}
// Wait until all the HTTP events have reached the master and are now
// awaiting authorization. There might be some other requests that get
// mixed into the authorizer, so we must have ample requests in the
// test body to ensure cache hits.
Time whileLoopStartTime = Clock::now();
Future<size_t> pendingHttpCalls;
do {
pendingHttpCalls = authorizer_->pending();
AWAIT_READY(pendingHttpCalls);
// Protect against a potential infinite loop introduced by future bugs.
ASSERT_TRUE(Clock::now() - whileLoopStartTime < Seconds(20));
} while (pendingHttpCalls.get() < totalRequests);
// Now block the master actor, since we don't want the master to start
// batching until it is queued up with all the HTTP requests.
// NOTE: This function might be out of scope when the dispatch is
// scheduled, so we need to pass `masterBlocker` by value.
auto masterBlocker = std::make_shared<Promise<Nothing>>();
process::dispatch(master_->pid, [masterBlocker]() {
masterBlocker->future().await();
});
// Unblock the BlockingAuthorizer.
// This should trigger all the deferrals onto the master from the
// Authorizer's thread. When this future completes, the master's queue
// should be full of batched requests.
AWAIT_READY(authorizer_->unleash());
// Unblock the master now, so it can perform the batching.
masterBlocker->set(Nothing());
}();
return requests;
}
bool MasterLoadTest::RequestDescriptor::operator<(
const RequestDescriptor& other) const
{
return endpoint < other.endpoint;
}
// Test that simultaneous responses to various different endpoints
// all return the expected result.
TEST_F(MasterLoadTest, SimultaneousBatchedRequests)
{
MockAuthorizer mockAuthorizer;
prepareCluster(&mockAuthorizer);
// Set up the actual test.
RequestDescriptor descriptor1;
descriptor1.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
descriptor1.endpoint = "/state";
RequestDescriptor descriptor2 = descriptor1;
descriptor2.endpoint = "/state-summary";
RequestDescriptor descriptor3 = descriptor1;
descriptor3.endpoint = "/frameworks";
RequestDescriptor descriptor4 = descriptor1;
descriptor4.endpoint = "/slaves";
RequestDescriptor descriptor5 = descriptor1;
descriptor5.endpoint = "/roles";
auto responses = launchSimultaneousRequests(
{descriptor1, descriptor2, descriptor3, descriptor4, descriptor5});
foreachpair (
const RequestDescriptor& request,
Future<Response>& response,
responses)
{
AWAIT_READY(response);
mesos::internal::master::Master* master = master_->master.get();
mesos::internal::master::Master::ReadOnlyHandler readOnlyHandler(master);
// TODO(bevers): Ideally we would not use HTTP at all to generate
// the reference response, but some master-internal function
// like `model(Summary<Master>)`.
Try<hashmap<std::string, std::string>> queryParameters_ =
process::http::query::decode(request.query);
ASSERT_SOME(queryParameters_);
hashmap<std::string, std::string> queryParameters = queryParameters_.get();
process::http::authentication::Principal principal(request.principal);
MockAuthorizer authorizer;
Owned<ObjectApprovers> approvers = ObjectApprovers::create(
&authorizer,
principal,
{VIEW_ROLE, VIEW_FLAGS, VIEW_FRAMEWORK, VIEW_TASK, VIEW_EXECUTOR})
.get();
Response reference;
if (request.endpoint == "/state") {
reference = readOnlyHandler.state(queryParameters, approvers);
} else if (request.endpoint == "/state-summary") {
reference = readOnlyHandler.stateSummary(queryParameters, approvers);
} else if (request.endpoint == "/roles") {
reference = readOnlyHandler.roles(queryParameters, approvers);
} else if (request.endpoint == "/frameworks") {
reference = readOnlyHandler.frameworks(queryParameters, approvers);
} else if (request.endpoint == "/slaves") {
reference = readOnlyHandler.slaves(queryParameters, approvers);
} else {
UNREACHABLE();
}
EXPECT_EQ(reference.body, response->body);
}
// Ensure that we actually hit the metrics code path while executing
// the test.
JSON::Object metrics = Metrics();
ASSERT_TRUE(metrics.values["master/http_cache_hits"].is<JSON::Number>());
ASSERT_GT(
metrics.values["master/http_cache_hits"].as<JSON::Number>().as<size_t>(),
0u);
}
// Test that simultaneous requests on a single endpoint for two
// different principals return different results.
TEST_F(MasterLoadTest, Principals)
{
// Set up a proper authorizer for this test.
master::Flags flags = CreateMasterFlags();
{
// Default principal is allowed to view frameworks.
mesos::ACL::ViewFramework* acl = flags.acls->add_view_frameworks();
acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal());
acl->mutable_users()->set_type(mesos::ACL::Entity::ANY);
}
{
// Default principal 2 is not allowed to view frameworks.
mesos::ACL::ViewFramework* acl = flags.acls->add_view_frameworks();
acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL_2.principal());
acl->mutable_users()->set_type(mesos::ACL::Entity::NONE);
}
Authorizer* localAuthorizer = Authorizer::create(flags.acls.get()).get();
prepareCluster(localAuthorizer);
// Set up the requests with correct principals.
RequestDescriptor descriptor1;
descriptor1.endpoint = "/frameworks";
descriptor1.principal = DEFAULT_CREDENTIAL.principal();
descriptor1.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
RequestDescriptor descriptor2 = descriptor1;
descriptor2.principal = DEFAULT_CREDENTIAL_2.principal();
descriptor2.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL_2);
auto responses = launchSimultaneousRequests({descriptor1, descriptor2});
JSON::Value expected = JSON::parse(
"{"
"\"frameworks\": [{"
"\"id\": \"" + stringify(frameworkId_) + "\""
"}]"
"}"
).get();
foreachpair (
const RequestDescriptor& request,
Future<Response>& response,
responses)
{
AWAIT_READY(response);
Try<JSON::Value> jsonResponse = JSON::parse(response->body);
ASSERT_SOME(jsonResponse);
if (request.principal == DEFAULT_CREDENTIAL.principal()) {
EXPECT_TRUE(jsonResponse->contains(expected))
<< "Principal " << request.principal
<< " got HTTP response: " << response->body;
} else {
EXPECT_FALSE(jsonResponse->contains(expected))
<< "Principal " << request.principal
<< " got HTTP response: " << response->body;
}
}
// Ensure that we actually hit the metrics code path while executing
// the test.
JSON::Object metrics = Metrics();
ASSERT_TRUE(metrics.values["master/http_cache_hits"].is<JSON::Number>());
ASSERT_GT(
metrics.values["master/http_cache_hits"].as<JSON::Number>().as<size_t>(),
0u);
}
// Test that simultaneous requests on a single endpoint with
// different query parameters produce different results.
TEST_F(MasterLoadTest, QueryParameters)
{
MockAuthorizer mockAuthorizer;
prepareCluster(&mockAuthorizer);
RequestDescriptor descriptor1;
descriptor1.endpoint = "/frameworks";
descriptor1.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
descriptor1.query = "";
RequestDescriptor descriptor2 = descriptor1;
descriptor2.query = "framework_id=nonexisting-framework-id";
RequestDescriptor descriptor3 = descriptor1;
descriptor3.query = "jsonp=xxx";
auto responses = launchSimultaneousRequests(
{descriptor1, descriptor2, descriptor3});
JSON::Value expected = JSON::parse(
"{"
"\"frameworks\": [{"
"\"id\": \"" + stringify(frameworkId_) + "\""
"}]"
"}"
).get();
foreachpair (
const RequestDescriptor& request,
Future<Response>& response,
responses)
{
AWAIT_READY(response);
if (strings::contains(request.query, "jsonp")) {
EXPECT_TRUE(strings::contains(response->body, "xxx"))
<< "Got HTTP response: " << response->body;
continue;
}
Try<JSON::Value> jsonResponse = JSON::parse(response->body);
ASSERT_SOME(jsonResponse);
if (strings::contains(request.query, "framework_id")) {
EXPECT_FALSE(jsonResponse->contains(expected))
<< "Got HTTP response: " << response->body;
} else {
EXPECT_TRUE(jsonResponse->contains(expected))
<< "Got HTTP response: " << response->body;
}
}
// Ensure that we actually hit the metrics code path while executing
// the test.
JSON::Object metrics = Metrics();
ASSERT_TRUE(metrics.values["master/http_cache_hits"].is<JSON::Number>());
ASSERT_GT(
metrics.values["master/http_cache_hits"].as<JSON::Number>().as<size_t>(),
0u);
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {