blob: a41c91f10bc0eedc754425b4de1b3e184c4ffb08 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#include "master/master.hpp"
#include <vector>
#include <google/protobuf/repeated_field.h>
#include <mesos/resources.hpp>
#include <mesos/quota/quota.hpp>
#include <process/defer.hpp>
#include <process/future.hpp>
#include <process/http.hpp>
#include <process/owned.hpp>
#include <stout/json.hpp>
#include <stout/protobuf.hpp>
#include <stout/stringify.hpp>
#include <stout/strings.hpp>
#include <stout/utils.hpp>
#include "logging/logging.hpp"
#include "master/quota.hpp"
#include "master/registrar.hpp"
namespace http = process::http;
using google::protobuf::RepeatedPtrField;
using http::Accepted;
using http::BadRequest;
using http::Conflict;
using http::Forbidden;
using http::OK;
using mesos::quota::QuotaInfo;
using mesos::quota::QuotaRequest;
using mesos::quota::QuotaStatus;
using process::Future;
using process::Owned;
using std::string;
using std::vector;
namespace mesos {
namespace internal {
namespace master {
Option<Error> Master::QuotaHandler::capacityHeuristic(
const QuotaInfo& request) const
{
VLOG(1) << "Performing capacity heuristic check for a set quota request";
// This should have been validated earlier.
CHECK(master->isWhitelistedRole(request.role()));
CHECK(!master->quotas.contains(request.role()));
// Calculate the total amount of resources requested by all quotas
// (including the request) in the cluster.
// NOTE: We have validated earlier that the quota for the role in the
// request does not exist, hence `master->quotas` is guaranteed not to
// contain the request role's quota yet.
// TODO(alexr): Relax this constraint once we allow updating quotas.
Resources totalQuota = request.guarantee();
foreachvalue (const Quota& quota, master->quotas) {
totalQuota += quota.info.guarantee();
}
// Determine whether the total quota, including the new request, does
// not exceed the sum of non-static cluster resources.
// NOTE: We do not necessarily calculate the full sum of non-static
// cluster resources. We apply the early termination logic as it can
// reduce the cost of the function significantly. This early exit does
// not influence the declared inequality check.
Resources nonStaticClusterResources;
foreachvalue (Slave* slave, master->slaves.registered) {
// We do not consider disconnected or inactive agents, because they
// do not participate in resource allocation.
if (!slave->connected || !slave->active) {
continue;
}
// NOTE: Dynamic reservations are not excluded here because they do
// not show up in `SlaveInfo` resources. In contrast to static
// reservations, dynamic reservations may be unreserved at any time,
// hence making resources available for quota'ed frameworks.
Resources nonStaticAgentResources =
Resources(slave->info.resources()).unreserved();
nonStaticClusterResources += nonStaticAgentResources;
// If we have found enough resources to satisfy the inequality, then
// we can return early.
if (nonStaticClusterResources.contains(totalQuota)) {
return None();
}
}
// If we reached this point, there are not enough available resources
// in the cluster, hence the request does not pass the heuristic.
return Error(
"Not enough available cluster capacity to reasonably satisfy quota "
"request; the force flag can be used to override this check");
}
void Master::QuotaHandler::rescindOffers(const QuotaInfo& request) const
{
const string& role = request.role();
// This should have been validated earlier.
CHECK(master->isWhitelistedRole(role));
int frameworksInRole = 0;
if (master->activeRoles.contains(role)) {
Role* roleState = master->activeRoles[role];
foreachvalue (const Framework* framework, roleState->frameworks) {
if (framework->connected && framework->active) {
++frameworksInRole;
}
}
}
// The resources recovered by rescinding outstanding offers.
Resources rescinded;
int visitedAgents = 0;
// Because resources are allocated in the allocator, there can be a race
// between rescinding and allocating. This race makes it hard to determine
// the exact amount of offers that should be rescinded in the master.
//
// We pessimistically assume that what seems like "available" resources
// in the allocator will be gone. We greedily rescind all offers from an
// agent at once until we have rescinded "enough" offers. Offers containing
// resources irrelevant to the quota request may be rescinded, as we
// rescind all offers on an agent. This is done to maintain the
// coarse-grained nature of agent offers, and helps reduce fragmentation of
// offers.
//
// Consider a quota request for role `role` for `requested` resources.
// There are `numFiR` frameworks in `role`. Let `rescinded` be the total
// number of rescinded resources and `numVA` be the number of visited
// agents, from which at least one offer has been rescinded. Then the
// algorithm can be summarized as follows:
//
// while (there are agents with outstanding offers) do:
// if ((`rescinded` contains `requested`) && (`numVA` >= `numFiR`) break;
// fetch an agent `a` with outstanding offers;
// rescind all outstanding offers from `a`;
// update `rescinded`, inc(numVA);
// end.
foreachvalue (const Slave* slave, master->slaves.registered) {
// If we have rescinded offers with at least as many resources as the
// quota request resources, then we are done.
if (rescinded.contains(request.guarantee()) &&
(visitedAgents >= frameworksInRole)) {
break;
}
// As in the capacity heuristic, we do not consider disconnected or
// inactive agents, because they do not participate in resource
// allocation.
if (!slave->connected || !slave->active) {
continue;
}
// TODO(alexr): Consider only rescinding from agents that have at least
// one resource relevant to the quota request.
// Rescind all outstanding offers from the given agent.
bool agentVisited = false;
foreach (Offer* offer, utils::copy(slave->offers)) {
master->allocator->recoverResources(
offer->framework_id(), offer->slave_id(), offer->resources(), None());
rescinded += offer->resources();
master->removeOffer(offer, true);
agentVisited = true;
}
if (agentVisited) {
++visitedAgents;
}
}
}
Future<http::Response> Master::QuotaHandler::set(
const http::Request& request,
const Option<string>& principal) const
{
VLOG(1) << "Setting quota from request: '" << request.body << "'";
// Check that the request type is POST which is guaranteed by the master.
CHECK_EQ("POST", request.method);
// Parse the request body into JSON.
Try<JSON::Object> jsonRequest = JSON::parse<JSON::Object>(request.body);
if (jsonRequest.isError()) {
return BadRequest(
"Failed to parse set quota request JSON '" + request.body + "': " +
jsonRequest.error());
}
// Convert JSON request to the `QuotaRequest` protobuf.
Try<QuotaRequest> protoRequest =
::protobuf::parse<QuotaRequest>(jsonRequest.get());
if (protoRequest.isError()) {
return BadRequest(
"Failed to validate set quota request JSON '" + request.body + "': " +
protoRequest.error());
}
// Create the `QuotaInfo` protobuf message from the request JSON.
Try<QuotaInfo> create = quota::createQuotaInfo(protoRequest.get());
if (create.isError()) {
return BadRequest(
"Failed to create 'QuotaInfo' from set quota request JSON '" +
request.body + "': " + create.error());
}
QuotaInfo quotaInfo = create.get();
// Check that the `QuotaInfo` is a valid quota request.
Option<Error> validateError = quota::validation::quotaInfo(quotaInfo);
if (validateError.isSome()) {
return BadRequest(
"Failed to validate set quota request JSON '" + request.body + "': " +
validateError.get().message);
}
// Check that the role is on the role whitelist, if it exists.
if (!master->isWhitelistedRole(quotaInfo.role())) {
return BadRequest(
"Failed to validate set quota request JSON '" + request.body +
"': Unknown role '" + quotaInfo.role() + "'");
}
// Check that we are not updating an existing quota.
// TODO(joerg84): Update error message once quota update is in place.
if (master->quotas.contains(quotaInfo.role())) {
return BadRequest(
"Failed to validate set quota request JSON '" + request.body +
"': Can not set quota for a role that already has quota");
}
// The force flag is used to overwrite the `capacityHeuristic` check.
const bool forced = protoRequest.get().force();
if (principal.isSome()) {
quotaInfo.set_principal(principal.get());
}
return authorizeSetQuota(principal, quotaInfo.role())
.then(defer(master->self(), [=](bool authorized) -> Future<http::Response> {
if (!authorized) {
return Forbidden();
}
return _set(quotaInfo, forced);
}));
}
Future<http::Response> Master::QuotaHandler::_set(
const QuotaInfo& quotaInfo,
bool forced) const
{
if (forced) {
VLOG(1) << "Using force flag to override quota capacity heuristic check";
} else {
// Validate whether a quota request can be satisfied.
Option<Error> error = capacityHeuristic(quotaInfo);
if (error.isSome()) {
return Conflict(
"Heuristic capacity check for set quota request failed: " +
error.get().message);
}
}
Quota quota = Quota{quotaInfo};
// Populate master's quota-related local state. We do this before updating
// the registry in order to make sure that we are not already trying to
// satisfy a request for this role (since this is a multi-phase event).
// NOTE: We do not need to remove quota for the role if the registry update
// fails because in this case the master fails as well.
master->quotas[quotaInfo.role()] = quota;
// Update the registry with the new quota and acknowledge the request.
return master->registrar->apply(Owned<Operation>(
new quota::UpdateQuota(quotaInfo)))
.then(defer(master->self(), [=](bool result) -> Future<http::Response> {
// See the top comment in "master/quota.hpp" for why this check is here.
CHECK(result);
master->allocator->setQuota(quotaInfo.role(), quota);
// Rescind outstanding offers to facilitate satisfying the quota request.
// NOTE: We set quota before we rescind to avoid a race. If we were to
// rescind first, then recovered resources may get allocated again
// before our call to `setQuota` was handled.
// The consequence of setting quota first is that (in the hierarchical
// allocator) it will trigger an allocation. This means the rescinded
// offer resources will only be available to quota once another
// allocation is invoked.
// This can be resolved in the future with an explicit allocation call,
// and this solution is preferred to having the race described earlier.
rescindOffers(quotaInfo);
return OK();
}));
}
Future<http::Response> Master::QuotaHandler::remove(
const http::Request& request,
const Option<string>& principal) const
{
VLOG(1) << "Removing quota for request path: '" << request.url.path << "'";
// Check that the request type is DELETE which is guaranteed by the master.
CHECK_EQ("DELETE", request.method);
// Extract role from url.
vector<string> tokens = strings::tokenize(request.url.path, "/");
// Check that there are exactly 3 parts: {master,quota,'role'}.
if (tokens.size() != 3u) {
return BadRequest(
"Failed to parse request path '" + request.url.path +
"': 3 tokens ('master', 'quota', 'role') required, found " +
stringify(tokens.size()) + " token(s)");
}
// Check that "quota" is the second to last token.
if (tokens.end()[-2] != "quota") {
return BadRequest(
"Failed to parse request path '" + request.url.path +
"': Missing 'quota' endpoint");
}
const string& role = tokens.back();
// Check that the role is on the role whitelist, if it exists.
if (!master->isWhitelistedRole(role)) {
return BadRequest(
"Failed to validate remove quota request for path '" +
request.url.path +"': Unknown role '" + role + "'");
}
// Check that we are removing an existing quota.
if (!master->quotas.contains(role)) {
return BadRequest(
"Failed to remove quota for path '" + request.url.path +
"': Role '" + role + "' has no quota set");
}
Option<string> quota_principal = master->quotas[role].info.has_principal()
? master->quotas[role].info.principal()
: Option<string>::none();
return authorizeRemoveQuota(principal, quota_principal)
.then(defer(master->self(), [=](bool authorized) -> Future<http::Response> {
if (!authorized) {
return Forbidden();
}
return _remove(role);
}));
}
Future<http::Response> Master::QuotaHandler::_remove(const string& role) const
{
// Remove quota from the quota-related local state. We do this before
// updating the registry in order to make sure that we are not already
// trying to remove quota for this role (since this is a multi-phase event).
// NOTE: We do not need to restore quota for the role if the registry
// update fails because in this case the master fails as well and quota
// will be restored automatically during the recovery.
master->quotas.erase(role);
// Update the registry with the removed quota and acknowledge the request.
return master->registrar->apply(Owned<Operation>(
new quota::RemoveQuota(role)))
.then(defer(master->self(), [=](bool result) -> Future<http::Response> {
// See the top comment in "master/quota.hpp" for why this check is here.
CHECK(result);
master->allocator->removeQuota(role);
return OK();
}));
}
Future<http::Response> Master::QuotaHandler::status(
const http::Request& request) const
{
VLOG(1) << "Handling quota status request";
// Check that the request type is GET which is guaranteed by the master.
CHECK_EQ("GET", request.method);
QuotaStatus status;
status.mutable_infos()->Reserve(static_cast<int>(master->quotas.size()));
// Create an entry (including role and resources) for each quota.
foreachvalue (const Quota& quota, master->quotas) {
status.add_infos()->CopyFrom(quota.info);
}
return OK(JSON::protobuf(status), request.url.query.get("jsonp"));
}
Future<bool> Master::QuotaHandler::authorizeSetQuota(
const Option<string>& principal,
const string& role) const
{
if (master->authorizer.isNone()) {
return true;
}
LOG(INFO) << "Authorizing principal '"
<< (principal.isSome() ? principal.get() : "ANY")
<< "' to request quota for role '" << role << "'";
mesos::ACL::SetQuota request;
if (principal.isSome()) {
request.mutable_principals()->add_values(principal.get());
} else {
request.mutable_principals()->set_type(mesos::ACL::Entity::ANY);
}
request.mutable_roles()->add_values(role);
return master->authorizer.get()->authorize(request);
}
Future<bool> Master::QuotaHandler::authorizeRemoveQuota(
const Option<string>& requestPrincipal,
const Option<string>& quotaPrincipal) const
{
if (master->authorizer.isNone()) {
return true;
}
LOG(INFO) << "Authorizing principal '"
<< (requestPrincipal.isSome() ? requestPrincipal.get() : "ANY")
<< "' to remove quota set by '"
<< (quotaPrincipal.isSome() ? quotaPrincipal.get() : "ANY")
<< "'";
mesos::ACL::RemoveQuota request;
if (requestPrincipal.isSome()) {
request.mutable_principals()->add_values(requestPrincipal.get());
} else {
request.mutable_principals()->set_type(mesos::ACL::Entity::ANY);
}
if (quotaPrincipal.isSome()) {
request.mutable_quota_principals()->add_values(quotaPrincipal.get());
} else {
request.mutable_quota_principals()->set_type(mesos::ACL::Entity::ANY);
}
return master->authorizer.get()->authorize(request);
}
} // namespace master {
} // namespace internal {
} // namespace mesos {