blob: 5654e7e3e188452ee1e59e2933d65f27a09a5dc7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <algorithm>
#include <iomanip>
#include <map>
#include <memory>
#include <set>
#include <sstream>
#include <string>
#include <tuple>
#include <utility>
#include <vector>
#include <mesos/attributes.hpp>
#include <mesos/type_utils.hpp>
#include <mesos/authorizer/authorizer.hpp>
#include <mesos/maintenance/maintenance.hpp>
#include <mesos/master/master.hpp>
#include <mesos/v1/master/master.hpp>
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/help.hpp>
#include <process/logging.hpp>
#include <process/metrics/metrics.hpp>
#include <stout/base64.hpp>
#include <stout/errorbase.hpp>
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/json.hpp>
#include <stout/jsonify.hpp>
#include <stout/lambda.hpp>
#include <stout/net.hpp>
#include <stout/none.hpp>
#include <stout/nothing.hpp>
#include <stout/numify.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/protobuf.hpp>
#include <stout/representation.hpp>
#include <stout/result.hpp>
#include <stout/strings.hpp>
#include <stout/try.hpp>
#include <stout/unreachable.hpp>
#include <stout/utils.hpp>
#include <stout/uuid.hpp>
#include "common/build.hpp"
#include "common/http.hpp"
#include "common/protobuf_utils.hpp"
#include "common/resources_utils.hpp"
#include "internal/devolve.hpp"
#include "logging/logging.hpp"
#include "master/machine.hpp"
#include "master/maintenance.hpp"
#include "master/master.hpp"
#include "master/validation.hpp"
#include "mesos/mesos.hpp"
#include "mesos/resources.hpp"
#include "version/version.hpp"
using google::protobuf::RepeatedPtrField;
using process::AUTHENTICATION;
using process::AUTHORIZATION;
using process::Clock;
using process::DESCRIPTION;
using process::Failure;
using process::Future;
using process::HELP;
using process::Logging;
using process::TLDR;
using process::http::Accepted;
using process::http::BadRequest;
using process::http::Conflict;
using process::http::Forbidden;
using process::http::InternalServerError;
using process::http::MethodNotAllowed;
using process::http::NotFound;
using process::http::NotImplemented;
using process::http::NotAcceptable;
using process::http::OK;
using process::http::Pipe;
using process::http::ServiceUnavailable;
using process::http::TemporaryRedirect;
using process::http::UnsupportedMediaType;
using process::http::URL;
using process::http::authentication::Principal;
using std::copy_if;
using std::list;
using std::map;
using std::set;
using std::string;
using std::tie;
using std::tuple;
using std::vector;
namespace mesos {
using mesos::authorization::createSubject;
static void json(
JSON::StringWriter* writer,
const FrameworkInfo::Capability& capability)
{
writer->append(FrameworkInfo::Capability::Type_Name(capability.type()));
}
static void json(
JSON::StringWriter* writer,
const SlaveInfo::Capability& capability)
{
writer->append(SlaveInfo::Capability::Type_Name(capability.type()));
}
static void json(JSON::ObjectWriter* writer, const Offer& offer)
{
writer->field("id", offer.id().value());
writer->field("framework_id", offer.framework_id().value());
writer->field("allocation_info", JSON::Protobuf(offer.allocation_info()));
writer->field("slave_id", offer.slave_id().value());
writer->field("resources", Resources(offer.resources()));
}
static void json(JSON::ObjectWriter* writer, const MasterInfo& info)
{
writer->field("id", info.id());
writer->field("pid", info.pid());
writer->field("port", info.port());
writer->field("hostname", info.hostname());
if (info.has_domain()) {
writer->field("domain", info.domain());
}
}
static void json(JSON::ObjectWriter* writer, const SlaveInfo& slaveInfo)
{
writer->field("id", slaveInfo.id().value());
writer->field("hostname", slaveInfo.hostname());
writer->field("port", slaveInfo.port());
writer->field("attributes", Attributes(slaveInfo.attributes()));
if (slaveInfo.has_domain()) {
writer->field("domain", slaveInfo.domain());
}
}
namespace internal {
namespace master {
// Pull in model overrides from common.
using mesos::internal::model;
// Pull in definitions from process.
using process::http::Response;
using process::http::Request;
using process::Owned;
// The summary representation of `T` to support the `/state-summary` endpoint.
// e.g., `Summary<Slave>`.
template <typename T>
struct Summary : Representation<T>
{
using Representation<T>::Representation;
};
// The full representation of `T` to support the `/state` endpoint.
// e.g., `Full<Slave>`.
template <typename T>
struct Full : Representation<T>
{
using Representation<T>::Representation;
};
// Forward declaration for `FullFrameworkWriter`.
static void json(JSON::ObjectWriter* writer, const Summary<Framework>& summary);
// Filtered representation of Full<Framework>.
// Executors and Tasks are filtered based on whether the
// user is authorized to view them.
struct FullFrameworkWriter {
FullFrameworkWriter(
const Owned<AuthorizationAcceptor>& authorizeTask,
const Owned<AuthorizationAcceptor>& authorizeExecutorInfo,
const Framework* framework)
: authorizeTask_(authorizeTask),
authorizeExecutorInfo_(authorizeExecutorInfo),
framework_(framework) {}
void operator()(JSON::ObjectWriter* writer) const
{
json(writer, Summary<Framework>(*framework_));
// Add additional fields to those generated by the
// `Summary<Framework>` overload.
writer->field("user", framework_->info.user());
writer->field("failover_timeout", framework_->info.failover_timeout());
writer->field("checkpoint", framework_->info.checkpoint());
writer->field("registered_time", framework_->registeredTime.secs());
writer->field("unregistered_time", framework_->unregisteredTime.secs());
if (framework_->info.has_principal()) {
writer->field("principal", framework_->info.principal());
}
// TODO(bmahler): Consider deprecating this in favor of the split
// used and offered resources added in `Summary<Framework>`.
writer->field(
"resources",
framework_->totalUsedResources + framework_->totalOfferedResources);
// TODO(benh): Consider making reregisteredTime an Option.
if (framework_->registeredTime != framework_->reregisteredTime) {
writer->field("reregistered_time", framework_->reregisteredTime.secs());
}
// For multi-role frameworks the `role` field will be unset.
// Note that we could set `roles` here for both cases, which
// would make tooling simpler (only need to look for `roles`).
// However, we opted to just mirror the protobuf akin to how
// generic protobuf -> JSON translation works.
if (framework_->capabilities.multiRole) {
writer->field("roles", framework_->info.roles());
} else {
writer->field("role", framework_->info.role());
}
// Model all of the tasks associated with a framework.
writer->field("tasks", [this](JSON::ArrayWriter* writer) {
foreachvalue (const TaskInfo& taskInfo, framework_->pendingTasks) {
// Skip unauthorized tasks.
if (!authorizeTask_->accept(taskInfo, framework_->info)) {
continue;
}
writer->element([this, &taskInfo](JSON::ObjectWriter* writer) {
writer->field("id", taskInfo.task_id().value());
writer->field("name", taskInfo.name());
writer->field("framework_id", framework_->id().value());
writer->field(
"executor_id",
taskInfo.executor().executor_id().value());
writer->field("slave_id", taskInfo.slave_id().value());
writer->field("state", TaskState_Name(TASK_STAGING));
writer->field("resources", Resources(taskInfo.resources()));
// Tasks are not allowed to mix resources allocated to
// different roles, see MESOS-6636.
writer->field(
"role",
taskInfo.resources().begin()->allocation_info().role());
writer->field("statuses", std::initializer_list<TaskStatus>{});
if (taskInfo.has_labels()) {
writer->field("labels", taskInfo.labels());
}
if (taskInfo.has_discovery()) {
writer->field("discovery", JSON::Protobuf(taskInfo.discovery()));
}
if (taskInfo.has_container()) {
writer->field("container", JSON::Protobuf(taskInfo.container()));
}
});
}
foreachvalue (Task* task, framework_->tasks) {
// Skip unauthorized tasks.
if (!authorizeTask_->accept(*task, framework_->info)) {
continue;
}
writer->element(*task);
}
});
writer->field("unreachable_tasks", [this](JSON::ArrayWriter* writer) {
foreachvalue (const Owned<Task>& task, framework_->unreachableTasks) {
// Skip unauthorized tasks.
if (!authorizeTask_->accept(*task.get(), framework_->info)) {
continue;
}
writer->element(*task.get());
}
});
writer->field("completed_tasks", [this](JSON::ArrayWriter* writer) {
foreach (const Owned<Task>& task, framework_->completedTasks) {
// Skip unauthorized tasks.
if (!authorizeTask_->accept(*task.get(), framework_->info)) {
continue;
}
writer->element(*task.get());
}
});
// Model all of the offers associated with a framework.
writer->field("offers", [this](JSON::ArrayWriter* writer) {
foreach (Offer* offer, framework_->offers) {
writer->element(*offer);
}
});
// Model all of the executors of a framework.
writer->field("executors", [this](JSON::ArrayWriter* writer) {
foreachpair (
const SlaveID& slaveId,
const auto& executorsMap,
framework_->executors) {
foreachvalue (const ExecutorInfo& executor, executorsMap) {
writer->element([this,
&executor,
&slaveId](JSON::ObjectWriter* writer) {
// Skip unauthorized executors.
if (!authorizeExecutorInfo_->accept(executor, framework_->info)) {
return;
}
json(writer, executor);
writer->field("slave_id", slaveId.value());
});
}
}
});
// Model all of the labels associated with a framework.
if (framework_->info.has_labels()) {
writer->field("labels", framework_->info.labels());
}
}
const Owned<AuthorizationAcceptor>& authorizeTask_;
const Owned<AuthorizationAcceptor>& authorizeExecutorInfo_;
const Framework* framework_;
};
struct SlaveWriter
{
SlaveWriter(
const Slave& slave,
const Owned<AuthorizationAcceptor>& authorizeRole)
: slave_(slave), authorizeRole_(authorizeRole) {}
void operator()(JSON::ObjectWriter* writer) const
{
json(writer, slave_.info);
writer->field("pid", string(slave_.pid));
writer->field("registered_time", slave_.registeredTime.secs());
if (slave_.reregisteredTime.isSome()) {
writer->field("reregistered_time", slave_.reregisteredTime.get().secs());
}
const Resources& totalResources = slave_.totalResources;
writer->field("resources", totalResources);
writer->field("used_resources", Resources::sum(slave_.usedResources));
writer->field("offered_resources", slave_.offeredResources);
writer->field(
"reserved_resources",
[&totalResources, this](JSON::ObjectWriter* writer) {
foreachpair (const string& role, const Resources& reservation,
totalResources.reservations()) {
// TODO(arojas): Consider showing unapproved resources in an
// aggregated special field, so that all resource values add up
// MESOS-7779.
if (authorizeRole_->accept(role)) {
writer->field(role, reservation);
}
}
});
writer->field("unreserved_resources", totalResources.unreserved());
writer->field("active", slave_.active);
writer->field("version", slave_.version);
writer->field("capabilities", slave_.capabilities.toRepeatedPtrField());
}
const Slave& slave_;
const Owned<AuthorizationAcceptor>& authorizeRole_;
};
struct SlavesWriter
{
SlavesWriter(
const Master::Slaves& slaves,
const Owned<AuthorizationAcceptor>& authorizeRole,
const IDAcceptor<SlaveID>& selectSlaveId)
: slaves_(slaves),
authorizeRole_(authorizeRole),
selectSlaveId_(selectSlaveId) {}
void operator()(JSON::ObjectWriter* writer) const
{
writer->field("slaves", [this](JSON::ArrayWriter* writer) {
foreachvalue (const Slave* slave, slaves_.registered) {
if (!selectSlaveId_.accept(slave->id)) {
continue;
}
writer->element([this, &slave](JSON::ObjectWriter* writer) {
writeSlave(slave, writer);
});
}
});
writer->field("recovered_slaves", [this](JSON::ArrayWriter* writer) {
foreachvalue (const SlaveInfo& slaveInfo, slaves_.recovered) {
if (!selectSlaveId_.accept(slaveInfo.id())) {
continue;
}
writer->element([&slaveInfo](JSON::ObjectWriter* writer) {
json(writer, slaveInfo);
});
}
});
}
void writeSlave(const Slave* slave, JSON::ObjectWriter* writer) const
{
SlaveWriter(*slave, authorizeRole_)(writer);
// Add the complete protobuf->JSON for all used, reserved,
// and offered resources. The other endpoints summarize
// resource information, which omits the details of
// reservations and persistent volumes. Full resource
// information is necessary so that operators can use the
// `/unreserve` and `/destroy-volumes` endpoints.
hashmap<string, Resources> reserved = slave->totalResources.reservations();
writer->field(
"reserved_resources_full",
[&reserved, this](JSON::ObjectWriter* writer) {
foreachpair (const string& role,
const Resources& resources,
reserved) {
if (authorizeRole_->accept(role)) {
writer->field(role, [&resources, this](
JSON::ArrayWriter* writer) {
foreach (Resource resource, resources) {
if (authorizeResource(resource, authorizeRole_)) {
convertResourceFormat(&resource, ENDPOINT);
writer->element(JSON::Protobuf(resource));
}
}
});
}
}
});
Resources unreservedResources = slave->totalResources.unreserved();
writer->field(
"unreserved_resources_full",
[&unreservedResources, this](JSON::ArrayWriter* writer) {
foreach (Resource resource, unreservedResources) {
if (authorizeResource(resource, authorizeRole_)) {
convertResourceFormat(&resource, ENDPOINT);
writer->element(JSON::Protobuf(resource));
}
}
});
Resources usedResources = Resources::sum(slave->usedResources);
writer->field(
"used_resources_full",
[&usedResources, this](JSON::ArrayWriter* writer) {
foreach (Resource resource, usedResources) {
if (authorizeResource(resource, authorizeRole_)) {
convertResourceFormat(&resource, ENDPOINT);
writer->element(JSON::Protobuf(resource));
}
}
});
const Resources& offeredResources = slave->offeredResources;
writer->field(
"offered_resources_full",
[&offeredResources, this](JSON::ArrayWriter* writer) {
foreach (Resource resource, offeredResources) {
if (authorizeResource(resource, authorizeRole_)) {
convertResourceFormat(&resource, ENDPOINT);
writer->element(JSON::Protobuf(resource));
}
}
});
}
const Master::Slaves& slaves_;
const Owned<AuthorizationAcceptor>& authorizeRole_;
const IDAcceptor<SlaveID>& selectSlaveId_;
};
static void json(JSON::ObjectWriter* writer, const Summary<Framework>& summary)
{
const Framework& framework = summary;
writer->field("id", framework.id().value());
writer->field("name", framework.info.name());
// Omit pid for http frameworks.
if (framework.pid.isSome()) {
writer->field("pid", string(framework.pid.get()));
}
// TODO(bmahler): Use these in the webui.
writer->field("used_resources", framework.totalUsedResources);
writer->field("offered_resources", framework.totalOfferedResources);
writer->field("capabilities", framework.info.capabilities());
writer->field("hostname", framework.info.hostname());
writer->field("webui_url", framework.info.webui_url());
writer->field("active", framework.active());
writer->field("connected", framework.connected());
writer->field("recovered", framework.recovered());
}
string Master::Http::API_HELP()
{
return HELP(
TLDR(
"Endpoint for API calls against the master."),
DESCRIPTION(
"Returns 200 OK when the request was processed successfully.",
"",
"Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
"current master is not the leader.",
"",
"Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
"found."),
AUTHENTICATION(true),
AUTHORIZATION(
"The information returned by this endpoint for certain calls",
"might be filtered based on the user accessing it.",
"For example a user might only see the subset of frameworks,",
"tasks, and executors they are allowed to view.",
"See the authorization documentation for details."));
}
Future<Response> Master::Http::api(
const Request& request,
const Option<Principal>& principal) const
{
// TODO(greggomann): Remove this check once the `Principal` type is used in
// `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
// See MESOS-7202.
if (principal.isSome() && principal->value.isNone()) {
return Forbidden(
"The request's authenticated principal contains claims, but no value "
"string. The master currently requires that principals have a value");
}
// TODO(vinod): Add metrics for rejected requests.
// TODO(vinod): Add support for rate limiting.
// When current master is not the leader, redirect to the leading master.
// Note that this could happen when an operator, or some other
// service, including a scheduler realizes this is the leading
// master before the master itself realizes it, e.g., due to a
// ZooKeeper watch delay.
if (!master->elected()) {
return redirect(request);
}
CHECK_SOME(master->recovered);
if (!master->recovered.get().isReady()) {
return ServiceUnavailable("Master has not finished recovery");
}
if (request.method != "POST") {
return MethodNotAllowed({"POST"}, request.method);
}
v1::master::Call v1Call;
// TODO(anand): Content type values are case-insensitive.
Option<string> contentType = request.headers.get("Content-Type");
if (contentType.isNone()) {
return BadRequest("Expecting 'Content-Type' to be present");
}
if (contentType.get() == APPLICATION_PROTOBUF) {
if (!v1Call.ParseFromString(request.body)) {
return BadRequest("Failed to parse body into Call protobuf");
}
} else if (contentType.get() == APPLICATION_JSON) {
Try<JSON::Value> value = JSON::parse(request.body);
if (value.isError()) {
return BadRequest("Failed to parse body into JSON: " + value.error());
}
Try<v1::master::Call> parse =
::protobuf::parse<v1::master::Call>(value.get());
if (parse.isError()) {
return BadRequest("Failed to convert JSON into Call protobuf: " +
parse.error());
}
v1Call = parse.get();
} else {
return UnsupportedMediaType(
string("Expecting 'Content-Type' of ") +
APPLICATION_JSON + " or " + APPLICATION_PROTOBUF);
}
mesos::master::Call call = devolve(v1Call);
Option<Error> error = validation::master::call::validate(call, principal);
if (error.isSome()) {
return BadRequest("Failed to validate master::Call: " +
error.get().message);
}
LOG(INFO) << "Processing call " << call.type();
ContentType acceptType;
if (request.acceptsMediaType(APPLICATION_JSON)) {
acceptType = ContentType::JSON;
} else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
acceptType = ContentType::PROTOBUF;
} else {
return NotAcceptable(
string("Expecting 'Accept' to allow ") +
"'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
}
switch (call.type()) {
case mesos::master::Call::UNKNOWN:
return NotImplemented();
case mesos::master::Call::GET_HEALTH:
return getHealth(call, principal, acceptType);
case mesos::master::Call::GET_FLAGS:
return getFlags(call, principal, acceptType);
case mesos::master::Call::GET_VERSION:
return getVersion(call, principal, acceptType);
case mesos::master::Call::GET_METRICS:
return getMetrics(call, principal, acceptType);
case mesos::master::Call::GET_LOGGING_LEVEL:
return getLoggingLevel(call, principal, acceptType);
case mesos::master::Call::SET_LOGGING_LEVEL:
return setLoggingLevel(call, principal, acceptType);
case mesos::master::Call::LIST_FILES:
return listFiles(call, principal, acceptType);
case mesos::master::Call::READ_FILE:
return readFile(call, principal, acceptType);
case mesos::master::Call::GET_STATE:
return getState(call, principal, acceptType);
case mesos::master::Call::GET_AGENTS:
return getAgents(call, principal, acceptType);
case mesos::master::Call::GET_FRAMEWORKS:
return getFrameworks(call, principal, acceptType);
case mesos::master::Call::GET_EXECUTORS:
return getExecutors(call, principal, acceptType);
case mesos::master::Call::GET_TASKS:
return getTasks(call, principal, acceptType);
case mesos::master::Call::GET_ROLES:
return getRoles(call, principal, acceptType);
case mesos::master::Call::GET_WEIGHTS:
return weightsHandler.get(call, principal, acceptType);
case mesos::master::Call::UPDATE_WEIGHTS:
return weightsHandler.update(call, principal, acceptType);
case mesos::master::Call::GET_MASTER:
return getMaster(call, principal, acceptType);
case mesos::master::Call::SUBSCRIBE:
return subscribe(call, principal, acceptType);
case mesos::master::Call::RESERVE_RESOURCES:
return reserveResources(call, principal, acceptType);
case mesos::master::Call::UNRESERVE_RESOURCES:
return unreserveResources(call, principal, acceptType);
case mesos::master::Call::CREATE_VOLUMES:
return createVolumes(call, principal, acceptType);
case mesos::master::Call::DESTROY_VOLUMES:
return destroyVolumes(call, principal, acceptType);
case mesos::master::Call::GET_MAINTENANCE_STATUS:
return getMaintenanceStatus(call, principal, acceptType);
case mesos::master::Call::GET_MAINTENANCE_SCHEDULE:
return getMaintenanceSchedule(call, principal, acceptType);
case mesos::master::Call::UPDATE_MAINTENANCE_SCHEDULE:
return updateMaintenanceSchedule(call, principal, acceptType);
case mesos::master::Call::START_MAINTENANCE:
return startMaintenance(call, principal, acceptType);
case mesos::master::Call::STOP_MAINTENANCE:
return stopMaintenance(call, principal, acceptType);
case mesos::master::Call::GET_QUOTA:
return quotaHandler.status(call, principal, acceptType);
case mesos::master::Call::SET_QUOTA:
return quotaHandler.set(call, principal);
case mesos::master::Call::REMOVE_QUOTA:
return quotaHandler.remove(call, principal);
}
UNREACHABLE();
}
Future<Response> Master::Http::subscribe(
const mesos::master::Call& call,
const Option<Principal>& principal,
ContentType contentType) const
{
CHECK_EQ(mesos::master::Call::SUBSCRIBE, call.type());
// Retrieve Approvers for authorizing frameworks and tasks.
Future<Owned<ObjectApprover>> frameworksApprover;
Future<Owned<ObjectApprover>> tasksApprover;
Future<Owned<ObjectApprover>> executorsApprover;
if (master->authorizer.isSome()) {
Option<authorization::Subject> subject = createSubject(principal);
frameworksApprover = master->authorizer.get()->getObjectApprover(
subject, authorization::VIEW_FRAMEWORK);
tasksApprover = master->authorizer.get()->getObjectApprover(
subject, authorization::VIEW_TASK);
executorsApprover = master->authorizer.get()->getObjectApprover(
subject, authorization::VIEW_EXECUTOR);
} else {
frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
tasksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
executorsApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
}
Future<Owned<AuthorizationAcceptor>> rolesAcceptor =
AuthorizationAcceptor::create(
principal,
master->authorizer,
authorization::VIEW_ROLE);
return collect(
frameworksApprover, tasksApprover, executorsApprover, rolesAcceptor)
.then(defer(master->self(),
[=](const tuple<Owned<ObjectApprover>,
Owned<ObjectApprover>,
Owned<ObjectApprover>,
Owned<AuthorizationAcceptor>>& approvers)
-> Future<Response> {
// Get approver from tuple.
Owned<ObjectApprover> frameworksApprover;
Owned<ObjectApprover> tasksApprover;
Owned<ObjectApprover> executorsApprover;
Owned<AuthorizationAcceptor> rolesAcceptor;
tie(frameworksApprover,
tasksApprover,
executorsApprover,
rolesAcceptor) = approvers;
Pipe pipe;
OK ok;
ok.headers["Content-Type"] = stringify(contentType);
ok.type = Response::PIPE;
ok.reader = pipe.reader();
HttpConnection http{pipe.writer(), contentType, UUID::random()};
master->subscribe(http);
mesos::master::Event event;
event.set_type(mesos::master::Event::SUBSCRIBED);
event.mutable_subscribed()->mutable_get_state()->CopyFrom(
_getState(
frameworksApprover,
tasksApprover,
executorsApprover,
rolesAcceptor));
http.send<mesos::master::Event, v1::master::Event>(event);
return ok;
}));
}
// TODO(ijimenez): Add some information or pointers to help
// users understand the HTTP Event/Call API.
string Master::Http::SCHEDULER_HELP()
{
return HELP(
TLDR(
"Endpoint for schedulers to make calls against the master."),
DESCRIPTION(
"Returns 202 Accepted iff the request is accepted.",
"",
"Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
"current master is not the leader.",
"",
"Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
"found."),
AUTHENTICATION(true),
AUTHORIZATION(
"The returned frameworks information might be filtered based on the",
"users authorization.",
"See the authorization documentation for details."));
}
Future<Response> Master::Http::scheduler(
const Request& request,
const Option<Principal>& principal) const
{
// TODO(greggomann): Remove this check once the `Principal` type is used in
// `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
// See MESOS-7202.
if (principal.isSome() && principal->value.isNone()) {
return Forbidden(
"The request's authenticated principal contains claims, but no value "
"string. The master currently requires that principals have a value");
}
// TODO(vinod): Add metrics for rejected requests.
// TODO(vinod): Add support for rate limiting.
// When current master is not the leader, redirect to the leading master.
// Note that this could happen if the scheduler realizes this is the
// leading master before the master itself realizes it, e.g., due to
// a ZooKeeper watch delay.
if (!master->elected()) {
return redirect(request);
}
CHECK_SOME(master->recovered);
if (!master->recovered.get().isReady()) {
return ServiceUnavailable("Master has not finished recovery");
}
if (request.method != "POST") {
return MethodNotAllowed({"POST"}, request.method);
}
v1::scheduler::Call v1Call;
// TODO(anand): Content type values are case-insensitive.
Option<string> contentType = request.headers.get("Content-Type");
if (contentType.isNone()) {
return BadRequest("Expecting 'Content-Type' to be present");
}
if (contentType.get() == APPLICATION_PROTOBUF) {
if (!v1Call.ParseFromString(request.body)) {
return BadRequest("Failed to parse body into Call protobuf");
}
} else if (contentType.get() == APPLICATION_JSON) {
Try<JSON::Value> value = JSON::parse(request.body);
if (value.isError()) {
return BadRequest("Failed to parse body into JSON: " + value.error());
}
Try<v1::scheduler::Call> parse =
::protobuf::parse<v1::scheduler::Call>(value.get());
if (parse.isError()) {
return BadRequest("Failed to convert JSON into Call protobuf: " +
parse.error());
}
v1Call = parse.get();
} else {
return UnsupportedMediaType(
string("Expecting 'Content-Type' of ") +
APPLICATION_JSON + " or " + APPLICATION_PROTOBUF);
}
scheduler::Call call = devolve(v1Call);
Option<Error> error = validation::scheduler::call::validate(call, principal);
if (error.isSome()) {
return BadRequest("Failed to validate scheduler::Call: " +
error.get().message);
}
if (call.type() == scheduler::Call::SUBSCRIBE) {
// We default to JSON 'Content-Type' in the response since an
// empty 'Accept' header results in all media types considered
// acceptable.
ContentType acceptType = ContentType::JSON;
if (request.acceptsMediaType(APPLICATION_JSON)) {
acceptType = ContentType::JSON;
} else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
acceptType = ContentType::PROTOBUF;
} else {
return NotAcceptable(
string("Expecting 'Accept' to allow ") +
"'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
}
// Make sure that a stream ID was not included in the request headers.
if (request.headers.contains("Mesos-Stream-Id")) {
return BadRequest(
"Subscribe calls should not include the 'Mesos-Stream-Id' header");
}
const FrameworkInfo& frameworkInfo = call.subscribe().framework_info();
// We allow an authenticated framework to not specify a principal in
// `FrameworkInfo`, but in that case we log a WARNING here. We also set
// `FrameworkInfo.principal` to the value of the authenticated principal
// and use it for authorization later.
//
// NOTE: Common validation code, called previously, verifies that the
// authenticated principal is the same as `FrameworkInfo.principal`,
// if present.
if (principal.isSome() && !frameworkInfo.has_principal()) {
CHECK_SOME(principal->value);
LOG(WARNING)
<< "Setting 'principal' in FrameworkInfo to '" << principal->value.get()
<< "' because the framework authenticated with that principal but "
<< "did not set it in FrameworkInfo";
call.mutable_subscribe()->mutable_framework_info()->set_principal(
principal->value.get());
}
Pipe pipe;
OK ok;
ok.headers["Content-Type"] = stringify(acceptType);
ok.type = Response::PIPE;
ok.reader = pipe.reader();
// Generate a stream ID and return it in the response.
UUID streamId = UUID::random();
ok.headers["Mesos-Stream-Id"] = streamId.toString();
HttpConnection http {pipe.writer(), acceptType, streamId};
master->subscribe(http, call.subscribe());
return ok;
}
// We consolidate the framework lookup logic here because it is
// common for all the call handlers.
Framework* framework = master->getFramework(call.framework_id());
if (framework == nullptr) {
return BadRequest("Framework cannot be found");
}
// TODO(greggomann): Move this implicit scheduler authorization
// into the authorizer. See MESOS-7399.
if (principal.isSome() && principal != framework->info.principal()) {
return BadRequest(
"Authenticated principal '" + stringify(principal.get()) + "' does not "
"match principal '" + framework->info.principal() + "' set in "
"`FrameworkInfo`");
}
if (!framework->connected()) {
return Forbidden("Framework is not subscribed");
}
if (framework->http.isNone()) {
return Forbidden("Framework is not connected via HTTP");
}
// This isn't a `SUBSCRIBE` call, so the request should include a stream ID.
if (!request.headers.contains("Mesos-Stream-Id")) {
return BadRequest(
"All non-subscribe calls should include the 'Mesos-Stream-Id' header");
}
const string& streamId = request.headers.at("Mesos-Stream-Id");
if (streamId != framework->http.get().streamId.toString()) {
return BadRequest(
"The stream ID '" + streamId + "' included in this request "
"didn't match the stream ID currently associated with framework ID "
+ framework->id().value());
}
switch (call.type()) {
case scheduler::Call::SUBSCRIBE:
// SUBSCRIBE call should have been handled above.
LOG(FATAL) << "Unexpected 'SUBSCRIBE' call";
case scheduler::Call::TEARDOWN:
master->removeFramework(framework);
return Accepted();
case scheduler::Call::ACCEPT:
master->accept(framework, call.accept());
return Accepted();
case scheduler::Call::DECLINE:
master->decline(framework, call.decline());
return Accepted();
case scheduler::Call::ACCEPT_INVERSE_OFFERS:
master->acceptInverseOffers(framework, call.accept_inverse_offers());
return Accepted();
case scheduler::Call::DECLINE_INVERSE_OFFERS:
master->declineInverseOffers(framework, call.decline_inverse_offers());
return Accepted();
case scheduler::Call::REVIVE:
master->revive(framework, call.revive());
return Accepted();
case scheduler::Call::SUPPRESS:
master->suppress(framework, call.suppress());
return Accepted();
case scheduler::Call::KILL:
master->kill(framework, call.kill());
return Accepted();
case scheduler::Call::SHUTDOWN:
master->shutdown(framework, call.shutdown());
return Accepted();
case scheduler::Call::ACKNOWLEDGE:
master->acknowledge(framework, call.acknowledge());
return Accepted();
case scheduler::Call::RECONCILE:
master->reconcile(framework, call.reconcile());
return Accepted();
case scheduler::Call::MESSAGE:
master->message(framework, call.message());
return Accepted();
case scheduler::Call::REQUEST:
master->request(framework, call.request());
return Accepted();
case scheduler::Call::UNKNOWN:
LOG(WARNING) << "Received 'UNKNOWN' call";
return NotImplemented();
}
return NotImplemented();
}
static Resources removeDiskInfos(const Resources& resources)
{
Resources result;
foreach (Resource resource, resources) {
resource.clear_disk();
result += resource;
}
return result;
}
string Master::Http::CREATE_VOLUMES_HELP()
{
return HELP(
TLDR(
"Create persistent volumes on reserved resources."),
DESCRIPTION(
"Returns 202 ACCEPTED which indicates that the create",
"operation has been validated successfully by the master.",
"",
"Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
"current master is not the leader.",
"",
"Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
"found.",
"",
"The request is then forwarded asynchronously to the Mesos",
"agent where the reserved resources are located.",
"That asynchronous message may not be delivered or",
"creating the volumes at the agent might fail.",
"",
"Please provide \"slaveId\" and \"volumes\" values describing",
"the volumes to be created."),
AUTHENTICATION(true),
AUTHORIZATION(
"Using this endpoint to create persistent volumes requires that",
"the current principal is authorized to create volumes for the",
"specific role.",
"See the authorization documentation for details."));
}
Future<Response> Master::Http::createVolumes(
const Request& request,
const Option<Principal>& principal) const
{
// TODO(greggomann): Remove this check once the `Principal` type is used in
// `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
// See MESOS-7202.
if (principal.isSome() && principal->value.isNone()) {
return Forbidden(
"The request's authenticated principal contains claims, but no value "
"string. The master currently requires that principals have a value");
}
// When current master is not the leader, redirect to the leading master.
if (!master->elected()) {
return redirect(request);
}
if (request.method != "POST") {
return MethodNotAllowed({"POST"}, request.method);
}
// Parse the query string in the request body.
Try<hashmap<string, string>> decode =
process::http::query::decode(request.body);
if (decode.isError()) {
return BadRequest("Unable to decode query string: " + decode.error());
}
const hashmap<string, string>& values = decode.get();
Option<string> value;
value = values.get("slaveId");
if (value.isNone()) {
return BadRequest("Missing 'slaveId' query parameter in the request body");
}
SlaveID slaveId;
slaveId.set_value(value.get());
value = values.get("volumes");
if (value.isNone()) {
return BadRequest("Missing 'volumes' query parameter in the request body");
}
Try<JSON::Array> parse =
JSON::parse<JSON::Array>(value.get());
if (parse.isError()) {
return BadRequest(
"Error in parsing 'volumes' query parameter in the request body: " +
parse.error());
}
RepeatedPtrField<Resource> volumes;
foreach (const JSON::Value& value, parse.get().values) {
Try<Resource> volume = ::protobuf::parse<Resource>(value);
if (volume.isError()) {
return BadRequest(
"Error in parsing 'volumes' query parameter in the request body: " +
volume.error());
}
volumes.Add()->CopyFrom(volume.get());
}
return _createVolumes(slaveId, volumes, principal);
}
Future<Response> Master::Http::_createVolumes(
const SlaveID& slaveId,
const RepeatedPtrField<Resource>& volumes,
const Option<Principal>& principal) const
{
Slave* slave = master->slaves.registered.get(slaveId);
if (slave == nullptr) {
return BadRequest("No agent found with specified ID");
}
// Create an offer operation.
Offer::Operation operation;
operation.set_type(Offer::Operation::CREATE);
operation.mutable_create()->mutable_volumes()->CopyFrom(volumes);
Option<Error> error = validateAndNormalizeResources(&operation);
if (error.isSome()) {
return BadRequest(error->message);
}
error = validation::operation::validate(
operation.create(),
slave->checkpointedResources,
principal,
slave->capabilities);
if (error.isSome()) {
return BadRequest(
"Invalid CREATE operation on agent " + stringify(*slave) + ": " +
error->message);
}
return master->authorizeCreateVolume(operation.create(), principal)
.then(defer(master->self(), [=](bool authorized) -> Future<Response> {
if (!authorized) {
return Forbidden();
}
// The resources required for this operation are equivalent to the
// volumes specified by the user minus any DiskInfo (DiskInfo will
// be created when this operation is applied).
return _operation(
slaveId, removeDiskInfos(operation.create().volumes()), operation);
}));
}
Future<Response> Master::Http::createVolumes(
const mesos::master::Call& call,
const Option<Principal>& principal,
ContentType /*contentType*/) const
{
// TODO(greggomann): Remove this check once the `Principal` type is used in
// `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
// See MESOS-7202.
if (principal.isSome() && principal->value.isNone()) {
return Forbidden(
"The request's authenticated principal contains claims, but no value "
"string. The master currently requires that principals have a value");
}
CHECK_EQ(mesos::master::Call::CREATE_VOLUMES, call.type());
CHECK(call.has_create_volumes());
const SlaveID& slaveId = call.create_volumes().slave_id();
const RepeatedPtrField<Resource>& volumes = call.create_volumes().volumes();
return _createVolumes(slaveId, volumes, principal);
}
string Master::Http::DESTROY_VOLUMES_HELP()
{
return HELP(
TLDR(
"Destroy persistent volumes."),
DESCRIPTION(
"Returns 202 ACCEPTED which indicates that the destroy",
"operation has been validated successfully by the master.",
"",
"Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
"current master is not the leader.",
"",
"Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
"found.",
"",
"The request is then forwarded asynchronously to the Mesos",
"agent where the reserved resources are located.",
"That asynchronous message may not be delivered or",
"destroying the volumes at the agent might fail.",
"",
"Please provide \"slaveId\" and \"volumes\" values describing",
"the volumes to be destroyed."),
AUTHENTICATION(true),
AUTHORIZATION(
"Using this endpoint to destroy persistent volumes requires that",
"the current principal is authorized to destroy volumes created",
"by the principal who created the volume.",
"See the authorization documentation for details."));
}
Future<Response> Master::Http::destroyVolumes(
const Request& request,
const Option<Principal>& principal) const
{
// TODO(greggomann): Remove this check once the `Principal` type is used in
// `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
// See MESOS-7202.
if (principal.isSome() && principal->value.isNone()) {
return Forbidden(
"The request's authenticated principal contains claims, but no value "
"string. The master currently requires that principals have a value");
}
// When current master is not the leader, redirect to the leading master.
if (!master->elected()) {
return redirect(request);
}
if (request.method != "POST") {
return MethodNotAllowed({"POST"}, request.method);
}
// Parse the query string in the request body.
Try<hashmap<string, string>> decode =
process::http::query::decode(request.body);
if (decode.isError()) {
return BadRequest("Unable to decode query string: " + decode.error());
}
const hashmap<string, string>& values = decode.get();
Option<string> value;
value = values.get("slaveId");
if (value.isNone()) {
return BadRequest("Missing 'slaveId' query parameter in the request body");
}
SlaveID slaveId;
slaveId.set_value(value.get());
value = values.get("volumes");
if (value.isNone()) {
return BadRequest("Missing 'volumes' query parameter in the request body");
}
Try<JSON::Array> parse =
JSON::parse<JSON::Array>(value.get());
if (parse.isError()) {
return BadRequest(
"Error in parsing 'volumes' query parameter in the request body: " +
parse.error());
}
RepeatedPtrField<Resource> volumes;
foreach (const JSON::Value& value, parse.get().values) {
Try<Resource> volume = ::protobuf::parse<Resource>(value);
if (volume.isError()) {
return BadRequest(
"Error in parsing 'volumes' query parameter in the request body: " +
volume.error());
}
volumes.Add()->CopyFrom(volume.get());
}
return _destroyVolumes(slaveId, volumes, principal);
}
Future<Response> Master::Http::_destroyVolumes(
const SlaveID& slaveId,
const RepeatedPtrField<Resource>& volumes,
const Option<Principal>& principal) const
{
Slave* slave = master->slaves.registered.get(slaveId);
if (slave == nullptr) {
return BadRequest("No agent found with specified ID");
}
// Create an offer operation.
Offer::Operation operation;
operation.set_type(Offer::Operation::DESTROY);
operation.mutable_destroy()->mutable_volumes()->CopyFrom(volumes);
Option<Error> error = validateAndNormalizeResources(&operation);
if (error.isSome()) {
return BadRequest(error->message);
}
error = validation::operation::validate(
operation.destroy(),
slave->checkpointedResources,
slave->usedResources,
slave->pendingTasks);
if (error.isSome()) {
return BadRequest("Invalid DESTROY operation: " + error->message);
}
return master->authorizeDestroyVolume(operation.destroy(), principal)
.then(defer(master->self(), [=](bool authorized) -> Future<Response> {
if (!authorized) {
return Forbidden();
}
return _operation(slaveId, operation.destroy().volumes(), operation);
}));
}
Future<Response> Master::Http::destroyVolumes(
const mesos::master::Call& call,
const Option<Principal>& principal,
ContentType /*contentType*/) const
{
// TODO(greggomann): Remove this check once the `Principal` type is used in
// `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
// See MESOS-7202.
if (principal.isSome() && principal->value.isNone()) {
return Forbidden(
"The request's authenticated principal contains claims, but no value "
"string. The master currently requires that principals have a value");
}
CHECK_EQ(mesos::master::Call::DESTROY_VOLUMES, call.type());
CHECK(call.has_destroy_volumes());
const SlaveID& slaveId = call.destroy_volumes().slave_id();
const RepeatedPtrField<Resource>& volumes = call.destroy_volumes().volumes();
return _destroyVolumes(slaveId, volumes, principal);
}
string Master::Http::FRAMEWORKS_HELP()
{
return HELP(
TLDR("Exposes the frameworks info."),
DESCRIPTION(
"Returns 200 OK when the frameworks info was queried successfully.",
"",
"Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
"current master is not the leader.",
"",
"Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
"found.",
"",
"Query parameters:",
"> framework_id=VALUE The ID of the framework returned "
"(if no framework ID is specified, all frameworks will be returned)."),
AUTHENTICATION(true),
AUTHORIZATION(
"This endpoint might be filtered based on the user accessing it.",
"See the authorization documentation for details."));
}
Future<Response> Master::Http::frameworks(
const Request& request,
const Option<Principal>& principal) const
{
// TODO(greggomann): Remove this check once the `Principal` type is used in
// `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
// See MESOS-7202.
if (principal.isSome() && principal->value.isNone()) {
return Forbidden(
"The request's authenticated principal contains claims, but no value "
"string. The master currently requires that principals have a value");
}
// When current master is not the leader, redirect to the leading master.
if (!master->elected()) {
return redirect(request);
}
Future<Owned<AuthorizationAcceptor>> authorizeFrameworkInfo =
AuthorizationAcceptor::create(
principal, master->authorizer, authorization::VIEW_FRAMEWORK);
Future<Owned<AuthorizationAcceptor>> authorizeTask =
AuthorizationAcceptor::create(
principal, master->authorizer, authorization::VIEW_TASK);
Future<Owned<AuthorizationAcceptor>> authorizeExecutorInfo =
AuthorizationAcceptor::create(
principal, master->authorizer, authorization::VIEW_EXECUTOR);
Future<IDAcceptor<FrameworkID>> selectFrameworkId =
IDAcceptor<FrameworkID>(request.url.query.get("framework_id"));
return collect(
authorizeFrameworkInfo,
authorizeTask,
authorizeExecutorInfo,
selectFrameworkId)
.then(defer(master->self(),
[this, request](const tuple<Owned<AuthorizationAcceptor>,
Owned<AuthorizationAcceptor>,
Owned<AuthorizationAcceptor>,
IDAcceptor<FrameworkID>>& acceptors)
-> Response {
// This lambda is consumed before the outer lambda
// returns, hence capture by reference is fine here.
auto frameworks = [this, &acceptors](JSON::ObjectWriter* writer) {
Owned<AuthorizationAcceptor> authorizeFrameworkInfo;
Owned<AuthorizationAcceptor> authorizeTask;
Owned<AuthorizationAcceptor> authorizeExecutorInfo;
IDAcceptor<FrameworkID> selectFrameworkId;
tie(authorizeFrameworkInfo,
authorizeTask,
authorizeExecutorInfo,
selectFrameworkId) = acceptors;
// Model all of the frameworks.
writer->field(
"frameworks",
[this,
&authorizeFrameworkInfo,
&authorizeTask,
&authorizeExecutorInfo,
&selectFrameworkId](JSON::ArrayWriter* writer) {
foreachvalue (Framework* framework, master->frameworks.registered) {
// Skip unauthorized frameworks or frameworks without a matching ID.
if (!selectFrameworkId.accept(framework->id()) ||
!authorizeFrameworkInfo->accept(framework->info)) {
continue;
}
FullFrameworkWriter frameworkWriter(
authorizeTask,
authorizeExecutorInfo,
framework);
writer->element(frameworkWriter);
}
});
// Model all of the completed frameworks.
writer->field(
"completed_frameworks",
[this,
&authorizeFrameworkInfo,
&authorizeTask,
&authorizeExecutorInfo,
&selectFrameworkId](JSON::ArrayWriter* writer) {
foreachvalue (const Owned<Framework>& framework,
master->frameworks.completed) {
// Skip unauthorized frameworks or frameworks without a matching ID.
if (!selectFrameworkId.accept(framework->id()) ||
!authorizeFrameworkInfo->accept(framework->info)) {
continue;
}
FullFrameworkWriter frameworkWriter(
authorizeTask,
authorizeExecutorInfo,
framework.get());
writer->element(frameworkWriter);
}
});
// Unregistered frameworks are no longer possible. We emit an
// empty array for the sake of backward compatibility.
writer->field("unregistered_frameworks", [](JSON::ArrayWriter*) {});
};
return OK(jsonify(frameworks), request.url.query.get("jsonp"));
}));
}
mesos::master::Response::GetFrameworks::Framework model(
const Framework& framework)
{
mesos::master::Response::GetFrameworks::Framework _framework;
_framework.mutable_framework_info()->CopyFrom(framework.info);
_framework.set_active(framework.active());
_framework.set_connected(framework.connected());
_framework.set_recovered(framework.recovered());
int64_t time = framework.registeredTime.duration().ns();
if (time != 0) {
_framework.mutable_registered_time()->set_nanoseconds(time);
}
time = framework.unregisteredTime.duration().ns();
if (time != 0) {
_framework.mutable_unregistered_time()->set_nanoseconds(time);
}
time = framework.reregisteredTime.duration().ns();
if (time != 0) {
_framework.mutable_reregistered_time()->set_nanoseconds(time);
}
foreach (const Offer* offer, framework.offers) {
_framework.mutable_offers()->Add()->CopyFrom(*offer);
}
foreach (const InverseOffer* offer, framework.inverseOffers) {
_framework.mutable_inverse_offers()->Add()->CopyFrom(*offer);
}
foreach (Resource resource, framework.totalUsedResources) {
convertResourceFormat(&resource, ENDPOINT);
_framework.mutable_allocated_resources()->Add()->CopyFrom(resource);
}
foreach (Resource resource, framework.totalOfferedResources) {
convertResourceFormat(&resource, ENDPOINT);
_framework.mutable_offered_resources()->Add()->CopyFrom(resource);
}
return _framework;
}
Future<Response> Master::Http::getFrameworks(
const mesos::master::Call& call,
const Option<Principal>& principal,
ContentType contentType) const
{
CHECK_EQ(mesos::master::Call::GET_FRAMEWORKS, call.type());
// Retrieve `ObjectApprover`s for authorizing frameworks.
Future<Owned<ObjectApprover>> frameworksApprover;
if (master->authorizer.isSome()) {
Option<authorization::Subject> subject = createSubject(principal);
frameworksApprover = master->authorizer.get()->getObjectApprover(
subject, authorization::VIEW_FRAMEWORK);
} else {
frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
}
return frameworksApprover
.then(defer(master->self(),
[=](const Owned<ObjectApprover>& frameworksApprover)
-> Future<Response> {
mesos::master::Response response;
response.set_type(mesos::master::Response::GET_FRAMEWORKS);
response.mutable_get_frameworks()->CopyFrom(
_getFrameworks(frameworksApprover));
return OK(serialize(contentType, evolve(response)),
stringify(contentType));
}));
}
mesos::master::Response::GetFrameworks Master::Http::_getFrameworks(
const Owned<ObjectApprover>& frameworksApprover) const
{
mesos::master::Response::GetFrameworks getFrameworks;
foreachvalue (const Framework* framework,
master->frameworks.registered) {
// Skip unauthorized frameworks.
if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
continue;
}
getFrameworks.add_frameworks()->CopyFrom(model(*framework));
}
foreachvalue (const Owned<Framework>& framework,
master->frameworks.completed) {
// Skip unauthorized frameworks.
if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
continue;
}
getFrameworks.add_completed_frameworks()->CopyFrom(model(*framework.get()));
}
return getFrameworks;
}
Future<Response> Master::Http::getExecutors(
const mesos::master::Call& call,
const Option<Principal>& principal,
ContentType contentType) const
{
CHECK_EQ(mesos::master::Call::GET_EXECUTORS, call.type());
// Retrieve `ObjectApprover`s for authorizing frameworks and executors.
Future<Owned<ObjectApprover>> frameworksApprover;
Future<Owned<ObjectApprover>> executorsApprover;
if (master->authorizer.isSome()) {
Option<authorization::Subject> subject = createSubject(principal);
frameworksApprover = master->authorizer.get()->getObjectApprover(
subject, authorization::VIEW_FRAMEWORK);
executorsApprover = master->authorizer.get()->getObjectApprover(
subject, authorization::VIEW_EXECUTOR);
} else {
frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
executorsApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
}
return collect(frameworksApprover, executorsApprover)
.then(defer(master->self(),
[=](const tuple<Owned<ObjectApprover>,
Owned<ObjectApprover>>& approvers)
-> Future<Response> {
// Get approver from tuple.
Owned<ObjectApprover> frameworksApprover;
Owned<ObjectApprover> executorsApprover;
tie(frameworksApprover, executorsApprover) = approvers;
mesos::master::Response response;
response.set_type(mesos::master::Response::GET_EXECUTORS);
response.mutable_get_executors()->CopyFrom(
_getExecutors(frameworksApprover, executorsApprover));
return OK(serialize(contentType, evolve(response)),
stringify(contentType));
}));
}
mesos::master::Response::GetExecutors Master::Http::_getExecutors(
const Owned<ObjectApprover>& frameworksApprover,
const Owned<ObjectApprover>& executorsApprover) const
{
// Construct framework list with both active and completed frameworks.
vector<const Framework*> frameworks;
foreachvalue (Framework* framework, master->frameworks.registered) {
// Skip unauthorized frameworks.
if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
continue;
}
frameworks.push_back(framework);
}
foreachvalue (const Owned<Framework>& framework,
master->frameworks.completed) {
// Skip unauthorized frameworks.
if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
continue;
}
frameworks.push_back(framework.get());
}
mesos::master::Response::GetExecutors getExecutors;
foreach (const Framework* framework, frameworks) {
foreachpair (const SlaveID& slaveId,
const auto& executorsMap,
framework->executors) {
foreachvalue (const ExecutorInfo& executorInfo, executorsMap) {
// Skip unauthorized executors.
if (!approveViewExecutorInfo(executorsApprover,
executorInfo,
framework->info)) {
continue;
}
mesos::master::Response::GetExecutors::Executor* executor =
getExecutors.add_executors();
executor->mutable_executor_info()->CopyFrom(executorInfo);
executor->mutable_slave_id()->CopyFrom(slaveId);
}
}
}
return getExecutors;
}
Future<Response> Master::Http::getState(
const mesos::master::Call& call,
const Option<Principal>& principal,
ContentType contentType) const
{
CHECK_EQ(mesos::master::Call::GET_STATE, call.type());
// Retrieve Approvers for authorizing frameworks and tasks.
Future<Owned<ObjectApprover>> frameworksApprover;
Future<Owned<ObjectApprover>> tasksApprover;
Future<Owned<ObjectApprover>> executorsApprover;
if (master->authorizer.isSome()) {
Option<authorization::Subject> subject = createSubject(principal);
frameworksApprover = master->authorizer.get()->getObjectApprover(
subject, authorization::VIEW_FRAMEWORK);
tasksApprover = master->authorizer.get()->getObjectApprover(
subject, authorization::VIEW_TASK);
executorsApprover = master->authorizer.get()->getObjectApprover(
subject, authorization::VIEW_EXECUTOR);
} else {
frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
tasksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
executorsApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
}
Future<Owned<AuthorizationAcceptor>> rolesAcceptor =
AuthorizationAcceptor::create(
principal,
master->authorizer,
authorization::VIEW_ROLE);
return collect(
frameworksApprover, tasksApprover, executorsApprover, rolesAcceptor)
.then(defer(master->self(),
[=](const tuple<Owned<ObjectApprover>,
Owned<ObjectApprover>,
Owned<ObjectApprover>,
Owned<AuthorizationAcceptor>>& approvers)
-> Future<Response> {
// Get approver from tuple.
Owned<ObjectApprover> frameworksApprover;
Owned<ObjectApprover> tasksApprover;
Owned<ObjectApprover> executorsApprover;
Owned<AuthorizationAcceptor> rolesAcceptor;
tie(frameworksApprover,
tasksApprover,
executorsApprover,
rolesAcceptor) = approvers;
mesos::master::Response response;
response.set_type(mesos::master::Response::GET_STATE);
response.mutable_get_state()->CopyFrom(
_getState(
frameworksApprover,
tasksApprover,
executorsApprover,
rolesAcceptor));
return OK(
serialize(contentType, evolve(response)), stringify(contentType));
}));
}
mesos::master::Response::GetState Master::Http::_getState(
const Owned<ObjectApprover>& frameworksApprover,
const Owned<ObjectApprover>& tasksApprover,
const Owned<ObjectApprover>& executorsApprover,
const Owned<AuthorizationAcceptor>& rolesAcceptor) const
{
// NOTE: This function must be blocking instead of returning a
// `Future`. This is because `subscribe()` needs to atomically
// add subscriber to `subscribers` map and send the captured state
// in `SUBSCRIBED` without being interleaved by any other events.
mesos::master::Response::GetState getState;
getState.mutable_get_tasks()->CopyFrom(
_getTasks(frameworksApprover, tasksApprover));
getState.mutable_get_executors()->CopyFrom(
_getExecutors(frameworksApprover, executorsApprover));
getState.mutable_get_frameworks()->CopyFrom(
_getFrameworks(frameworksApprover));
getState.mutable_get_agents()->CopyFrom(_getAgents(rolesAcceptor));
return getState;
}
class Master::Http::FlagsError : public Error
{
public:
enum Type
{
UNAUTHORIZED
};
// TODO(arojas): Provide a proper string representation of the enum.
explicit FlagsError(Type _type)
: Error(stringify(_type)), type(_type) {}
FlagsError(Type _type, const string& _message)
: Error(stringify(_type)), type(_type), message(_message) {}
const Type type;
const string message;
};
string Master::Http::FLAGS_HELP()
{
return HELP(
TLDR("Exposes the master's flag configuration."),
None(),
AUTHENTICATION(true),
AUTHORIZATION(
"Querying this endpoint requires that the current principal",
"is authorized to view all flags.",
"See the authorization documentation for details."));
}
Future<Response> Master::Http::flags(
const Request& request,
const Option<Principal>& principal) const
{
// TODO(greggomann): Remove this check once the `Principal` type is used in
// `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
// See MESOS-7202.
if (principal.isSome() && principal->value.isNone()) {
return Forbidden(
"The request's authenticated principal contains claims, but no value "
"string. The master currently requires that principals have a value");
}
// TODO(nfnt): Remove check for enabled
// authorization as part of MESOS-5346.
if (request.method != "GET" && master->authorizer.isSome()) {
return MethodNotAllowed({"GET"}, request.method);
}
Option<string> jsonp = request.url.query.get("jsonp");
return _flags(principal)
.then([jsonp](const Try<JSON::Object, FlagsError>& flags)
-> Future<Response> {
if (flags.isError()) {
switch (flags.error().type) {
case FlagsError::Type::UNAUTHORIZED:
return Forbidden();
}
return InternalServerError(flags.error().message);
}
return OK(flags.get(), jsonp);
});
}
Future<Try<JSON::Object, Master::Http::FlagsError>> Master::Http::_flags(
const Option<Principal>& principal) const
{
if (master->authorizer.isNone()) {
return __flags();
}
authorization::Request authRequest;
authRequest.set_action(authorization::VIEW_FLAGS);
Option<authorization::Subject> subject = createSubject(principal);
if (subject.isSome()) {
authRequest.mutable_subject()->CopyFrom(subject.get());
}
return master->authorizer.get()->authorized(authRequest)
.then(defer(
master->self(),
[this](bool authorized) -> Future<Try<JSON::Object, FlagsError>> {
if (authorized) {
return __flags();
} else {
return FlagsError(FlagsError::Type::UNAUTHORIZED);
}
}));
}
JSON::Object Master::Http::__flags() const
{
JSON::Object object;
{
JSON::Object flags;
foreachvalue (const flags::Flag& flag, master->flags) {
Option<string> value = flag.stringify(master->flags);
if (value.isSome()) {
flags.values[flag.effective_name().value] = value.get();
}
}
object.values["flags"] = std::move(flags);
}
return object;
}
Future<Response> Master::Http::getFlags(
const mesos::master::Call& call,
const Option<Principal>& principal,
ContentType contentType) const
{
CHECK_EQ(mesos::master::Call::GET_FLAGS, call.type());
return _flags(principal)
.then([contentType](const Try<JSON::Object, FlagsError>& flags)
-> Future<Response> {
if (flags.isError()) {
switch (flags.error().type) {
case FlagsError::Type::UNAUTHORIZED:
return Forbidden();
}
return InternalServerError(flags.error().message);
}
return OK(
serialize(contentType,
evolve<v1::master::Response::GET_FLAGS>(flags.get())),
stringify(contentType));
});
}
string Master::Http::HEALTH_HELP()
{
return HELP(
TLDR(
"Health status of the Master."),
DESCRIPTION(
"Returns 200 OK iff the Master is healthy.",
"Delayed responses are also indicative of poor health."),
AUTHENTICATION(false));
}
Future<Response> Master::Http::health(const Request& request) const
{
return OK();
}
Future<Response> Master::Http::getHealth(
const mesos::master::Call& call,
const Option<Principal>& principal,
ContentType contentType) const
{
CHECK_EQ(mesos::master::Call::GET_HEALTH, call.type());
mesos::master::Response response;
response.set_type(mesos::master::Response::GET_HEALTH);
response.mutable_get_health()->set_healthy(true);
return OK(serialize(contentType, evolve(response)),
stringify(contentType));
}
Future<Response> Master::Http::getVersion(
const mesos::master::Call& call,
const Option<Principal>& principal,
ContentType contentType) const
{
CHECK_EQ(mesos::master::Call::GET_VERSION, call.type());
return OK(serialize(contentType,
evolve<v1::master::Response::GET_VERSION>(version())),
stringify(contentType));
}
Future<Response> Master::Http::getMetrics(
const mesos::master::Call& call,
const Option<Principal>& principal,
ContentType contentType) const
{
CHECK_EQ(mesos::master::Call::GET_METRICS, call.type());
CHECK(call.has_get_metrics());
Option<Duration> timeout;
if (call.get_metrics().has_timeout()) {
timeout = Nanoseconds(call.get_metrics().timeout().nanoseconds());
}
return process::metrics::snapshot(timeout)
.then([contentType](const hashmap<string, double>& metrics) -> Response {
mesos::master::Response response;
response.set_type(mesos::master::Response::GET_METRICS);
mesos::master::Response::GetMetrics* _getMetrics =
response.mutable_get_metrics();
foreachpair (const string& key, double value, metrics) {
Metric* metric = _getMetrics->add_metrics();
metric->set_name(key);
metric->set_value(value);
}
return OK(serialize(contentType, evolve(response)),
stringify(contentType));
});
}
Future<Response> Master::Http::getLoggingLevel(
const mesos::master::Call& call,
const Option<Principal>& principal,
ContentType contentType) const
{
CHECK_EQ(mesos::master::Call::GET_LOGGING_LEVEL, call.type());
mesos::master::Response response;
response.set_type(mesos::master::Response::GET_LOGGING_LEVEL);
response.mutable_get_logging_level()->set_level(FLAGS_v);
return OK(serialize(contentType, evolve(response)),
stringify(contentType));
}
Future<Response> Master::Http::setLoggingLevel(
const mesos::master::Call& call,
const Option<Principal>& principal,
ContentType /*contentType*/) const
{
CHECK_EQ(mesos::master::Call::SET_LOGGING_LEVEL, call.type());
CHECK(call.has_set_logging_level());
uint32_t level = call.set_logging_level().level();
Duration duration =
Nanoseconds(call.set_logging_level().duration().nanoseconds());
Future<Owned<ObjectApprover>> approver;
if (master->authorizer.isSome()) {
Option<authorization::Subject> subject = createSubject(principal);
approver = master->authorizer.get()->getObjectApprover(
subject, authorization::SET_LOG_LEVEL);
} else {
approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
}
return approver.then([level, duration](const Owned<ObjectApprover>& approver)
-> Future<Response> {
Try<bool> approved = approver->approved((ObjectApprover::Object()));
if (approved.isError()) {
return InternalServerError("Authorization error: " + approved.error());
} else if (!approved.get()) {
return Forbidden();
}
return dispatch(process::logging(), &Logging::set_level, level, duration)
.then([]() -> Response {
return OK();
});
});
}
Future<Response> Master::Http::getMaster(
const mesos::master::Call& call,
const Option<Principal>& principal,
ContentType contentType) const
{
CHECK_EQ(mesos::master::Call::GET_MASTER, call.type());
mesos::master::Response response;
response.set_type(mesos::master::Response::GET_MASTER);
// It is guaranteed that this master has been elected as the leader.
CHECK(master->elected());
response.mutable_get_master()->mutable_master_info()->CopyFrom(
master->info());
return OK(serialize(contentType, evolve(response)),
stringify(contentType));
}
string Master::Http::REDIRECT_HELP()
{
return HELP(
TLDR(
"Redirects to the leading Master."),
DESCRIPTION(
"Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
"current master is not the leader.",
"",
"Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
"found.",
"",
"**NOTES:**",
"1. This is the recommended way to bookmark the WebUI when "
"running multiple Masters.",
"2. This is broken currently \"on the cloud\" (e.g., EC2) as "
"this will attempt to redirect to the private IP address, unless "
"`advertise_ip` points to an externally accessible IP"),
AUTHENTICATION(false));
}
Future<Response> Master::Http::redirect(const Request& request) const
{
// If there's no leader, return `ServiceUnavailable`.
if (master->leader.isNone()) {
LOG(WARNING) << "Current master is not elected as leader, and leader "
<< "information is unavailable. Failed to redirect the "
<< "request url: " << request.url;
return ServiceUnavailable("No leader elected");
}
MasterInfo info = master->leader.get();
// NOTE: Currently, 'info.ip()' stores ip in network order, which
// should be fixed. See MESOS-1201 for details.
Try<string> hostname = info.has_hostname()
? info.hostname()
: net::getHostname(net::IP(ntohl(info.ip())));
if (hostname.isError()) {
return InternalServerError(hostname.error());
}
LOG(INFO) << "Redirecting request for " << request.url
<< " to the leading master " << hostname.get();
// NOTE: We can use a protocol-relative URL here in order to allow
// the browser (or other HTTP client) to prefix with 'http:' or
// 'https:' depending on the original request. See
// https://tools.ietf.org/html/rfc7231#section-7.1.2 as well as
// http://stackoverflow.com/questions/12436669/using-protocol-relative-uris-within-location-headers
// which discusses this.
string basePath = "//" + hostname.get() + ":" + stringify(info.port());
string redirectPath = "/redirect";
string masterRedirectPath = "/" + master->self().id + "/redirect";
if (request.url.path == redirectPath ||
request.url.path == masterRedirectPath) {
// When request url is '/redirect' or '/master/redirect', redirect to the
// base url of leading master to avoid infinite redirect loop.
return TemporaryRedirect(basePath);
} else if (strings::startsWith(request.url.path, redirectPath + "/") ||
strings::startsWith(request.url.path, masterRedirectPath + "/")) {
// Prevent redirection loop.
return NotFound();
} else {
// `request.url` is not absolute so we can safely append it to
// `basePath`. See https://tools.ietf.org/html/rfc2616#section-5.1.2
// for details.
CHECK(!request.url.isAbsolute());
return TemporaryRedirect(basePath + stringify(request.url));
}
}
string Master::Http::RESERVE_HELP()
{
return HELP(
TLDR(
"Reserve resources dynamically on a specific agent."),
DESCRIPTION(
"Returns 202 ACCEPTED which indicates that the reserve",
"operation has been validated successfully by the master.",
"",
"Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
"current master is not the leader.",
"",
"Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
"found.",
"",
"The request is then forwarded asynchronously to the Mesos",
"agent where the reserved resources are located.",
"That asynchronous message may not be delivered or",
"reserving resources at the agent might fail.",
"",
"Please provide \"slaveId\" and \"resources\" values describing",
"the resources to be reserved."),
AUTHENTICATION(true),
AUTHORIZATION(
"Using this endpoint to reserve resources requires that the",
"current principal is authorized to reserve resources for the",
"specific role.",
"See the authorization documentation for details."));
}
Future<Response> Master::Http::reserve(
const Request& request,
const Option<Principal>& principal) const
{
// TODO(greggomann): Remove this check once the `Principal` type is used in
// `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
// See MESOS-7202.
if (principal.isSome() && principal->value.isNone()) {
return Forbidden(
"The request's authenticated principal contains claims, but no value "
"string. The master currently requires that principals have a value");
}
// When current master is not the leader, redirect to the leading master.
if (!master->elected()) {
return redirect(request);
}
if (request.method != "POST") {
return MethodNotAllowed({"POST"}, request.method);
}
// Parse the query string in the request body.
Try<hashmap<string, string>> decode =
process::http::query::decode(request.body);
if (decode.isError()) {
return BadRequest("Unable to decode query string: " + decode.error());
}
const hashmap<string, string>& values = decode.get();
Option<string> value;
value = values.get("slaveId");
if (value.isNone()) {
return BadRequest("Missing 'slaveId' query parameter in the request body");
}
SlaveID slaveId;
slaveId.set_value(value.get());
value = values.get("resources");
if (value.isNone()) {
return BadRequest(
"Missing 'resources' query parameter in the request body");
}
Try<JSON::Array> parse =
JSON::parse<JSON::Array>(value.get());
if (parse.isError()) {
return BadRequest(
"Error in parsing 'resources' query parameter in the request body: " +
parse.error());
}
RepeatedPtrField<Resource> resources;
foreach (const JSON::Value& value, parse.get().values) {
Try<Resource> resource = ::protobuf::parse<Resource>(value);
if (resource.isError()) {
return BadRequest(
"Error in parsing 'resources' query parameter in the request body: " +
resource.error());
}
resources.Add()->CopyFrom(resource.get());
}
return _reserve(slaveId, resources, principal);
}
Future<Response> Master::Http::_reserve(
const SlaveID& slaveId,
const RepeatedPtrField<Resource>& resources,
const Option<Principal>& principal) const
{
Slave* slave = master->slaves.registered.get(slaveId);
if (slave == nullptr) {
return BadRequest("No agent found with specified ID");
}
// Create an offer operation.
Offer::Operation operation;
operation.set_type(Offer::Operation::RESERVE);
operation.mutable_reserve()->mutable_resources()->CopyFrom(resources);
Option<Error> error = validateAndNormalizeResources(&operation);
if (error.isSome()) {
return BadRequest(error->message);
}
error = validation::operation::validate(
operation.reserve(), principal, slave->capabilities);
if (error.isSome()) {
return BadRequest(
"Invalid RESERVE operation on agent " + stringify(*slave) + ": " +
error->message);
}
return master->authorizeReserveResources(operation.reserve(), principal)
.then(defer(master->self(), [=](bool authorized) -> Future<Response> {
if (!authorized) {
return Forbidden();
}
// We only allow "pushing" a single reservation at a time, so we require
// the resources with one reservation "popped" to be present on the agent.
Resources required =
Resources(operation.reserve().resources()).popReservation();
return _operation(slaveId, required, operation);
}));
}
Future<Response> Master::Http::reserveResources(
const mesos::master::Call& call,
const Option<Principal>& principal,
ContentType contentType) const
{
CHECK_EQ(mesos::master::Call::RESERVE_RESOURCES, call.type());
const SlaveID& slaveId = call.reserve_resources().slave_id();
const RepeatedPtrField<Resource>& resources =
call.reserve_resources().resources();
return _reserve(slaveId, resources, principal);
}
string Master::Http::SLAVES_HELP()
{
return HELP(
TLDR(
"Information about agents."),
DESCRIPTION(
"Returns 200 OK when the request was processed successfully.",
"",
"Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
"current master is not the leader.",
"",
"Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
"found.",
"",
"This endpoint shows information about the agents which are registered",
"in this master or recovered from registry, formatted as a JSON",
"object.",
"",
"Query parameters:",
"> slave_id=VALUE The ID of the slave returned "
"(when no slave_id is specified, all slaves will be returned)."),
AUTHENTICATION(true));
}
Future<Response> Master::Http::slaves(
const Request& request,
const Option<Principal>& principal) const
{
// When current master is not the leader, redirect to the leading master.
if (!master->elected()) {
return redirect(request);
}
Future<Owned<AuthorizationAcceptor>> authorizeRole =
AuthorizationAcceptor::create(
principal, master->authorizer, authorization::VIEW_ROLE);
Future<IDAcceptor<SlaveID>> selectSlaveId =
IDAcceptor<SlaveID>(request.url.query.get("slave_id"));
Master* master = this->master;
Option<string> jsonp = request.url.query.get("jsonp");
return collect(authorizeRole, selectSlaveId)
.then(defer(master->self(),
[master, jsonp](const tuple<Owned<AuthorizationAcceptor>,
IDAcceptor<SlaveID>>& acceptors)
-> Future<Response> {
Owned<AuthorizationAcceptor> authorizeRole;
IDAcceptor<SlaveID> selectSlaveId;
tie(authorizeRole, selectSlaveId) = acceptors;
return OK(
jsonify(SlavesWriter(master->slaves, authorizeRole, selectSlaveId)),
jsonp);
}));
}
Future<process::http::Response> Master::Http::getAgents(
const mesos::master::Call& call,
const Option<Principal>& principal,
ContentType contentType) const
{
CHECK_EQ(mesos::master::Call::GET_AGENTS, call.type());
return AuthorizationAcceptor::create(
principal,
master->authorizer,
authorization::VIEW_ROLE)
.then(defer(master->self(),
[=](const Owned<AuthorizationAcceptor>& rolesAcceptor)
-> Future<process::http::Response> {
mesos::master::Response response;
response.set_type(mesos::master::Response::GET_AGENTS);
response.mutable_get_agents()->CopyFrom(_getAgents(rolesAcceptor));
return OK(serialize(contentType, evolve(response)),
stringify(contentType));
}));
}
mesos::master::Response::GetAgents Master::Http::_getAgents(
const Owned<AuthorizationAcceptor>& rolesAcceptor) const
{
mesos::master::Response::GetAgents getAgents;
foreachvalue (const Slave* slave, master->slaves.registered) {
mesos::master::Response::GetAgents::Agent* agent = getAgents.add_agents();
agent->CopyFrom(
protobuf::master::event::createAgentResponse(*slave, rolesAcceptor));
}
foreachvalue (const SlaveInfo& slaveInfo, master->slaves.recovered) {
SlaveInfo* agent = getAgents.add_recovered_agents();
agent->CopyFrom(slaveInfo);
agent->clear_resources();
foreach (const Resource& resource, slaveInfo.resources()) {
if (authorizeResource(resource, rolesAcceptor)) {
agent->add_resources()->CopyFrom(resource);
}
}
}
return getAgents;
}
string Master::Http::QUOTA_HELP()
{
return HELP(
TLDR(
"Gets or updates quota for roles."),
DESCRIPTION(
"Returns 200 OK when the quota was queried or updated successfully.",
"",
"Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
"current master is not the leader.",
"",
"Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
"found.",
"",
"GET: Returns the currently set quotas as JSON.",
"",
"POST: Validates the request body as JSON",
" and sets quota for a role.",
"",
"DELETE: Validates the request body as JSON",
" and removes quota for a role."),
AUTHENTICATION(true),
AUTHORIZATION(
"Using this endpoint to set a quota for a certain role requires that",
"the current principal is authorized to set quota for the target role.",
"Similarly, removing quota requires that the principal is authorized",
"to remove quota created by the quota_principal.",
"Getting quota information for a certain role requires that the",
"current principal is authorized to get quota for the target role,",
"otherwise the entry for the target role could be silently filtered.",
"See the authorization documentation for details."));
}
Future<Response> Master::Http::quota(
const Request& request,
const Option<Principal>& principal) const
{
// TODO(greggomann): Remove this check once the `Principal` type is used in
// `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
// See MESOS-7202.
if (principal.isSome() && principal->value.isNone()) {
return Forbidden(
"The request's authenticated principal contains claims, but no value "
"string. The master currently requires that principals have a value");
}
// When current master is not the leader, redirect to the leading master.
if (!master->elected()) {
return redirect(request);
}
// Dispatch based on HTTP method to separate `QuotaHandler`.
if (request.method == "GET") {
return quotaHandler.status(request, principal);
}
if (request.method == "POST") {
return quotaHandler.set(request, principal);
}
if (request.method == "DELETE") {
return quotaHandler.remove(request, principal);
}
// TODO(joerg84): Add update logic for PUT requests
// once Quota supports updates.
return MethodNotAllowed({"GET", "POST", "DELETE"}, request.method);
}
string Master::Http::WEIGHTS_HELP()
{
return HELP(
TLDR(
"Updates weights for the specified roles."),
DESCRIPTION(
"Returns 200 OK when the weights update was successful.",
"",
"Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
"current master is not the leader.",
"",
"Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
"found.",
"",
"PUT: Validates the request body as JSON",
"and updates the weights for the specified roles."),
AUTHENTICATION(true),
AUTHORIZATION(
"Getting weight information for a role requires that the current",
"principal is authorized to get weights for the target role,",
"otherwise the entry for the target role could be silently filtered.",
"See the authorization documentation for details."));
}
Future<Response> Master::Http::weights(
const Request& request,
const Option<Principal>& principal) const
{
// TODO(greggomann): Remove this check once the `Principal` type is used in
// `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
// See MESOS-7202.