blob: ff3408f65ee68ab12a219918637bbd1065fcb484 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <map>
#include <memory>
#include <sstream>
#include <string>
#include <tuple>
#include <vector>
#include <mesos/attributes.hpp>
#include <mesos/type_utils.hpp>
#include <mesos/agent/agent.hpp>
#include <mesos/authorizer/authorizer.hpp>
#include <mesos/executor/executor.hpp>
#include <mesos/slave/containerizer.hpp>
#include <mesos/v1/agent/agent.hpp>
#include <mesos/v1/executor/executor.hpp>
#include <process/collect.hpp>
#include <process/future.hpp>
#include <process/help.hpp>
#include <process/http.hpp>
#include <process/limiter.hpp>
#include <process/logging.hpp>
#include <process/loop.hpp>
#include <process/owned.hpp>
#include <process/metrics/metrics.hpp>
#include <stout/foreach.hpp>
#include <stout/json.hpp>
#include <stout/jsonify.hpp>
#include <stout/lambda.hpp>
#include <stout/net.hpp>
#include <stout/numify.hpp>
#include <stout/stringify.hpp>
#include <stout/strings.hpp>
#include <stout/unreachable.hpp>
#include "common/authorization.hpp"
#include "common/build.hpp"
#include "common/future_tracker.hpp"
#include "common/http.hpp"
#include "common/recordio.hpp"
#include "common/resources_utils.hpp"
#include "common/validation.hpp"
#include "internal/devolve.hpp"
#include "mesos/mesos.hpp"
#include "mesos/resources.hpp"
#include "resource_provider/local.hpp"
#include "slave/constants.hpp"
#include "slave/http.hpp"
#include "slave/slave.hpp"
#include "slave/validation.hpp"
#include "slave/containerizer/mesos/containerizer.hpp"
#include "slave/containerizer/mesos/paths.hpp"
#include "version/version.hpp"
using google::protobuf::internal::WireFormatLite;
using mesos::agent::ProcessIO;
using mesos::authorization::createSubject;
using mesos::authorization::VIEW_CONTAINER;
using mesos::authorization::VIEW_FLAGS;
using mesos::authorization::VIEW_FRAMEWORK;
using mesos::authorization::VIEW_TASK;
using mesos::authorization::VIEW_EXECUTOR;
using mesos::authorization::VIEW_RESOURCE_PROVIDER;
using mesos::authorization::VIEW_ROLE;
using mesos::authorization::SET_LOG_LEVEL;
using mesos::authorization::ATTACH_CONTAINER_INPUT;
using mesos::authorization::ATTACH_CONTAINER_OUTPUT;
using mesos::authorization::LAUNCH_NESTED_CONTAINER;
using mesos::authorization::LAUNCH_NESTED_CONTAINER_SESSION;
using mesos::authorization::LAUNCH_STANDALONE_CONTAINER;
using mesos::authorization::WAIT_NESTED_CONTAINER;
using mesos::authorization::WAIT_STANDALONE_CONTAINER;
using mesos::authorization::VIEW_STANDALONE_CONTAINER;
using mesos::authorization::KILL_NESTED_CONTAINER;
using mesos::authorization::KILL_STANDALONE_CONTAINER;
using mesos::authorization::REMOVE_NESTED_CONTAINER;
using mesos::authorization::REMOVE_STANDALONE_CONTAINER;
using mesos::authorization::MODIFY_RESOURCE_PROVIDER_CONFIG;
using mesos::authorization::MARK_RESOURCE_PROVIDER_GONE;
using mesos::authorization::PRUNE_IMAGES;
using mesos::internal::protobuf::WireFormatLite2;
using mesos::internal::recordio::Reader;
using mesos::slave::ContainerClass;
using mesos::slave::ContainerConfig;
using mesos::slave::ContainerTermination;
using process::AUTHENTICATION;
using process::AUTHORIZATION;
using process::Break;
using process::Continue;
using process::ControlFlow;
using process::Clock;
using process::DESCRIPTION;
using process::Failure;
using process::Future;
using process::HELP;
using process::Logging;
using process::loop;
using process::Owned;
using process::TLDR;
using process::http::Accepted;
using process::http::BadRequest;
using process::http::Conflict;
using process::http::Connection;
using process::http::Forbidden;
using process::http::NotFound;
using process::http::InternalServerError;
using process::http::MethodNotAllowed;
using process::http::NotAcceptable;
using process::http::NotImplemented;
using process::http::OK;
using process::http::Pipe;
using process::http::ServiceUnavailable;
using process::http::UnsupportedMediaType;
using process::http::authentication::Principal;
using process::metrics::internal::MetricsProcess;
using ::recordio::Decoder;
using std::function;
using std::list;
using std::map;
using std::string;
using std::tie;
using std::tuple;
using std::vector;
namespace mesos {
static void json(JSON::ObjectWriter* writer, const TaskInfo& task)
{
writer->field("id", task.task_id().value());
writer->field("name", task.name());
writer->field("slave_id", task.slave_id().value());
writer->field("resources", task.resources());
// Tasks are not allowed to mix resources allocated to
// different roles, see MESOS-6636.
writer->field("role", task.resources().begin()->allocation_info().role());
if (task.has_command()) {
writer->field("command", task.command());
}
if (task.has_executor()) {
writer->field("executor_id", task.executor().executor_id().value());
}
if (task.has_discovery()) {
writer->field("discovery", JSON::Protobuf(task.discovery()));
}
}
namespace internal {
namespace slave {
// Pull in the process definitions.
using process::http::Response;
using process::http::Request;
// Filtered representation of an Executor. Tasks within this executor
// are filtered based on whether the user is authorized to view them.
struct ExecutorWriter
{
ExecutorWriter(
const Owned<ObjectApprovers>& approvers,
const Executor* executor,
const Framework* framework)
: approvers_(approvers),
executor_(executor),
framework_(framework) {}
void operator()(JSON::ObjectWriter* writer) const
{
writer->field("id", executor_->id.value());
writer->field("name", executor_->info.name());
writer->field("source", executor_->info.source());
writer->field("container", executor_->containerId.value());
writer->field("directory", executor_->directory);
writer->field("resources", executor_->allocatedResources());
// Resources may be empty for command executors.
if (!executor_->info.resources().empty()) {
// Executors are not allowed to mix resources allocated to
// different roles, see MESOS-6636.
writer->field(
"role",
executor_->info.resources().begin()->allocation_info().role());
}
if (executor_->info.has_labels()) {
writer->field("labels", executor_->info.labels());
}
if (executor_->info.has_type()) {
writer->field("type", ExecutorInfo::Type_Name(executor_->info.type()));
}
writer->field("tasks", [this](JSON::ArrayWriter* writer) {
foreachvalue (Task* task, executor_->launchedTasks) {
if (!approvers_->approved<VIEW_TASK>(*task, framework_->info)) {
continue;
}
writer->element(*task);
}
});
writer->field("queued_tasks", [this](JSON::ArrayWriter* writer) {
foreachvalue (const TaskInfo& task, executor_->queuedTasks) {
if (!approvers_->approved<VIEW_TASK>(task, framework_->info)) {
continue;
}
writer->element(task);
}
});
writer->field("completed_tasks", [this](JSON::ArrayWriter* writer) {
foreach (const std::shared_ptr<Task>& task, executor_->completedTasks) {
if (!approvers_->approved<VIEW_TASK>(*task, framework_->info)) {
continue;
}
writer->element(*task);
}
// NOTE: We add 'terminatedTasks' to 'completed_tasks' for
// simplicity.
foreachvalue (Task* task, executor_->terminatedTasks) {
if (!approvers_->approved<VIEW_TASK>(*task, framework_->info)) {
continue;
}
writer->element(*task);
}
});
}
const Owned<ObjectApprovers>& approvers_;
const Executor* executor_;
const Framework* framework_;
};
// Filtered representation of FrameworkInfo.
// Executors and Tasks are filtered based on whether the
// user is authorized to view them.
struct FrameworkWriter
{
FrameworkWriter(
const Owned<ObjectApprovers>& approvers,
const Framework* framework)
: approvers_(approvers),
framework_(framework) {}
void operator()(JSON::ObjectWriter* writer) const
{
writer->field("id", framework_->id().value());
writer->field("name", framework_->info.name());
writer->field("user", framework_->info.user());
writer->field("failover_timeout", framework_->info.failover_timeout());
writer->field("checkpoint", framework_->info.checkpoint());
writer->field("hostname", framework_->info.hostname());
if (framework_->info.has_principal()) {
writer->field("principal", framework_->info.principal());
}
// For multi-role frameworks the `role` field will be unset.
// Note that we could set `roles` here for both cases, which
// would make tooling simpler (only need to look for `roles`).
// However, we opted to just mirror the protobuf akin to how
// generic protobuf -> JSON translation works.
if (framework_->capabilities.multiRole) {
writer->field("roles", framework_->info.roles());
} else {
writer->field("role", framework_->info.role());
}
writer->field("executors", [this](JSON::ArrayWriter* writer) {
foreachvalue (Executor* executor, framework_->executors) {
if (!approvers_->approved<VIEW_EXECUTOR>(
executor->info, framework_->info)) {
continue;
}
ExecutorWriter executorWriter(
approvers_,
executor,
framework_);
writer->element(executorWriter);
}
});
writer->field(
"completed_executors", [this](JSON::ArrayWriter* writer) {
foreach (
const Owned<Executor>& executor, framework_->completedExecutors) {
if (!approvers_->approved<VIEW_EXECUTOR>(
executor->info, framework_->info)) {
continue;
}
ExecutorWriter executorWriter(
approvers_,
executor.get(),
framework_);
writer->element(executorWriter);
}
});
}
const Owned<ObjectApprovers>& approvers_;
const Framework* framework_;
};
string Http::API_HELP()
{
return HELP(
TLDR(
"Endpoint for API calls against the agent."),
DESCRIPTION(
"Returns 200 OK if the call is successful"),
AUTHENTICATION(true));
}
Future<Response> Http::api(
const Request& request,
const Option<Principal>& principal) const
{
// TODO(anand): Add metrics for rejected requests.
if (slave->state == Slave::RECOVERING) {
return ServiceUnavailable("Agent has not finished recovery");
}
if (request.method != "POST") {
return MethodNotAllowed({"POST"}, request.method);
}
Option<string> contentType_ = request.headers.get("Content-Type");
if (contentType_.isNone()) {
return BadRequest("Expecting 'Content-Type' to be present");
}
ContentType contentType;
if (contentType_.get() == APPLICATION_JSON) {
contentType = ContentType::JSON;
} else if (contentType_.get() == APPLICATION_PROTOBUF) {
contentType = ContentType::PROTOBUF;
} else if (contentType_.get() == APPLICATION_RECORDIO) {
contentType = ContentType::RECORDIO;
} else {
return UnsupportedMediaType(
string("Expecting 'Content-Type' of ") +
APPLICATION_JSON + " or " + APPLICATION_PROTOBUF +
+ " or " + APPLICATION_RECORDIO);
}
Option<ContentType> messageContentType;
Option<string> messageContentType_ =
request.headers.get(MESSAGE_CONTENT_TYPE);
if (streamingMediaType(contentType)) {
if (messageContentType_.isNone()) {
return BadRequest(
"Expecting '" + stringify(MESSAGE_CONTENT_TYPE) + "' to be" +
" set for streaming requests");
}
if (messageContentType_.get() == APPLICATION_JSON) {
messageContentType = Option<ContentType>(ContentType::JSON);
} else if (messageContentType_.get() == APPLICATION_PROTOBUF) {
messageContentType = Option<ContentType>(ContentType::PROTOBUF);
} else {
return UnsupportedMediaType(
string("Expecting '") + MESSAGE_CONTENT_TYPE + "' of " +
APPLICATION_JSON + " or " + APPLICATION_PROTOBUF);
}
} else {
// Validate that a client has not set the "Message-Content-Type"
// header for a non-streaming request.
if (messageContentType_.isSome()) {
return UnsupportedMediaType(
string("Expecting '") + MESSAGE_CONTENT_TYPE + "' to be not"
" set for non-streaming requests");
}
}
// This lambda deserializes a string into a valid `Call`
// based on the content type.
auto deserializer = [](const string& body, ContentType contentType)
-> Try<mesos::agent::Call> {
Try<v1::agent::Call> v1Call =
deserialize<v1::agent::Call>(contentType, body);
if (v1Call.isError()) {
return Error(v1Call.error());
}
mesos::agent::Call call = devolve(v1Call.get());
Option<Error> error = validation::agent::call::validate(call);
if (error.isSome()) {
return Error("Failed to validate agent::Call: " + error->message);
}
return call;
};
// For backwards compatibility, if a client does not specify an 'Accept'
// header, 'Content-Type' of the response is set to 'application/json'
// for streaming responses.
//
// TODO(anand): In v2 API, the default 'Content-Type' for streaming responses
// should be 'application/recordio'.
ContentType acceptType;
if (request.acceptsMediaType(APPLICATION_JSON)) {
acceptType = ContentType::JSON;
} else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
acceptType = ContentType::PROTOBUF;
} else if (request.acceptsMediaType(APPLICATION_RECORDIO)) {
acceptType = ContentType::RECORDIO;
} else {
return NotAcceptable(
string("Expecting 'Accept' to allow ") +
APPLICATION_JSON + " or " + APPLICATION_PROTOBUF + " or " +
APPLICATION_RECORDIO);
}
Option<ContentType> messageAcceptType;
if (streamingMediaType(acceptType)) {
// Note that `acceptsMediaType()` returns true if the given headers
// field does not exist, i.e. by default we return JSON here.
if (request.acceptsMediaType(MESSAGE_ACCEPT, APPLICATION_JSON)) {
messageAcceptType = ContentType::JSON;
} else if (request.acceptsMediaType(MESSAGE_ACCEPT, APPLICATION_PROTOBUF)) {
messageAcceptType = ContentType::PROTOBUF;
} else {
return NotAcceptable(
string("Expecting '") + MESSAGE_ACCEPT + "' to allow " +
APPLICATION_JSON + " or " + APPLICATION_PROTOBUF);
}
} else {
// Validate that a client has not set the "Message-Accept"
// header for a non-streaming response.
if (request.headers.contains(MESSAGE_ACCEPT)) {
return NotAcceptable(
string("Expecting '") + MESSAGE_ACCEPT +
"' to be not set for non-streaming responses");
}
}
CHECK_EQ(Request::PIPE, request.type);
CHECK_SOME(request.reader);
RequestMediaTypes mediaTypes {
contentType, acceptType, messageContentType, messageAcceptType};
if (streamingMediaType(contentType)) {
CHECK_SOME(mediaTypes.messageContent);
Owned<Reader<mesos::agent::Call>> reader(new Reader<mesos::agent::Call>(
lambda::bind(
deserializer, lambda::_1, mediaTypes.messageContent.get()),
request.reader.get()));
return reader->read()
.then(defer(
slave->self(),
[=](const Result<mesos::agent::Call>& call) -> Future<Response> {
if (call.isNone()) {
return BadRequest("Received EOF while reading request body");
}
if (call.isError()) {
return BadRequest(call.error());
}
return _api(call.get(), std::move(reader), mediaTypes, principal);
}));
} else {
Pipe::Reader reader = request.reader.get(); // Remove const.
return reader.readAll()
.then(defer(
slave->self(),
[=](const string& body) -> Future<Response> {
Try<mesos::agent::Call> call = deserializer(body, contentType);
if (call.isError()) {
return BadRequest(call.error());
}
return _api(call.get(), None(), mediaTypes, principal);
}));
}
}
Future<Response> Http::_api(
const mesos::agent::Call& call,
Option<Owned<Reader<mesos::agent::Call>>>&& reader,
const RequestMediaTypes& mediaTypes,
const Option<Principal>& principal) const
{
// Validate that a client has not _accidentally_ sent us a
// streaming request for a call type that does not support it.
if (streamingMediaType(mediaTypes.content) &&
call.type() != mesos::agent::Call::ATTACH_CONTAINER_INPUT) {
return UnsupportedMediaType(
"Streaming 'Content-Type' " + stringify(mediaTypes.content) + " is "
"not supported for " + stringify(call.type()) + " call");
} else if (!streamingMediaType(mediaTypes.content) &&
call.type() == mesos::agent::Call::ATTACH_CONTAINER_INPUT) {
return UnsupportedMediaType(
string("Expecting 'Content-Type' to be ") + APPLICATION_RECORDIO +
" for " + stringify(call.type()) + " call");
}
if (streamingMediaType(mediaTypes.accept) &&
call.type() != mesos::agent::Call::ATTACH_CONTAINER_OUTPUT &&
call.type() != mesos::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION) {
return NotAcceptable("Streaming response is not supported for " +
stringify(call.type()) + " call");
}
// Each handler must log separately to add context
// it might extract from the nested message.
switch (call.type()) {
case mesos::agent::Call::UNKNOWN:
return NotImplemented();
case mesos::agent::Call::GET_HEALTH:
return getHealth(call, mediaTypes.accept, principal);
case mesos::agent::Call::GET_FLAGS:
return getFlags(call, mediaTypes.accept, principal);
case mesos::agent::Call::GET_VERSION:
return getVersion(call, mediaTypes.accept, principal);
case mesos::agent::Call::GET_METRICS:
return getMetrics(call, mediaTypes.accept, principal);
case mesos::agent::Call::GET_LOGGING_LEVEL:
return getLoggingLevel(call, mediaTypes.accept, principal);
case mesos::agent::Call::SET_LOGGING_LEVEL:
return setLoggingLevel(call, mediaTypes.accept, principal);
case mesos::agent::Call::LIST_FILES:
return listFiles(call, mediaTypes.accept, principal);
case mesos::agent::Call::READ_FILE:
return readFile(call, mediaTypes.accept, principal);
case mesos::agent::Call::GET_STATE:
return getState(call, mediaTypes.accept, principal);
case mesos::agent::Call::GET_CONTAINERS:
return getContainers(call, mediaTypes.accept, principal);
case mesos::agent::Call::GET_FRAMEWORKS:
return getFrameworks(call, mediaTypes.accept, principal);
case mesos::agent::Call::GET_EXECUTORS:
return getExecutors(call, mediaTypes.accept, principal);
case mesos::agent::Call::GET_OPERATIONS:
return getOperations(call, mediaTypes.accept, principal);
case mesos::agent::Call::GET_TASKS:
return getTasks(call, mediaTypes.accept, principal);
case mesos::agent::Call::GET_AGENT:
return getAgent(call, mediaTypes.accept, principal);
case mesos::agent::Call::GET_RESOURCE_PROVIDERS:
return getResourceProviders(call, mediaTypes.accept, principal);
case mesos::agent::Call::LAUNCH_NESTED_CONTAINER:
return launchNestedContainer(call, mediaTypes.accept, principal);
case mesos::agent::Call::WAIT_NESTED_CONTAINER:
return waitNestedContainer(call, mediaTypes.accept, principal);
case mesos::agent::Call::KILL_NESTED_CONTAINER:
return killNestedContainer(call, mediaTypes.accept, principal);
case mesos::agent::Call::REMOVE_NESTED_CONTAINER:
return removeNestedContainer(call, mediaTypes.accept, principal);
case mesos::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION:
return launchNestedContainerSession(call, mediaTypes, principal);
case mesos::agent::Call::ATTACH_CONTAINER_INPUT:
CHECK_SOME(reader);
return attachContainerInput(
call, std::move(reader).get(), mediaTypes, principal);
case mesos::agent::Call::ATTACH_CONTAINER_OUTPUT:
return attachContainerOutput(call, mediaTypes, principal);
case mesos::agent::Call::LAUNCH_CONTAINER:
return launchContainer(call, mediaTypes.accept, principal);
case mesos::agent::Call::WAIT_CONTAINER:
return waitContainer(call, mediaTypes.accept, principal);
case mesos::agent::Call::KILL_CONTAINER:
return killContainer(call, mediaTypes.accept, principal);
case mesos::agent::Call::REMOVE_CONTAINER:
return removeContainer(call, mediaTypes.accept, principal);
case mesos::agent::Call::ADD_RESOURCE_PROVIDER_CONFIG:
return addResourceProviderConfig(call, principal);
case mesos::agent::Call::UPDATE_RESOURCE_PROVIDER_CONFIG:
return updateResourceProviderConfig(call, principal);
case mesos::agent::Call::REMOVE_RESOURCE_PROVIDER_CONFIG:
return removeResourceProviderConfig(call, principal);
case mesos::agent::Call::MARK_RESOURCE_PROVIDER_GONE:
return markResourceProviderGone(call, principal);
case mesos::agent::Call::PRUNE_IMAGES:
return pruneImages(call, mediaTypes.accept, principal);
}
UNREACHABLE();
}
string Http::EXECUTOR_HELP() {
return HELP(
TLDR(
"Endpoint for the Executor HTTP API."),
DESCRIPTION(
"This endpoint is used by the executors to interact with the",
"agent via Call/Event messages.",
"",
"Returns 200 OK iff the initial SUBSCRIBE Call is successful.",
"This will result in a streaming response via chunked",
"transfer encoding. The executors can process the response",
"incrementally.",
"",
"Returns 202 Accepted for all other Call messages iff the",
"request is accepted."),
AUTHENTICATION(true));
}
// TODO(greggomann): Remove this function when implicit executor authorization
// is moved into the authorizer. See MESOS-7399.
Option<Error> verifyExecutorClaims(
const Principal& principal,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const ContainerID& containerId) {
if (!(principal.claims.contains("fid") &&
principal.claims.at("fid") == frameworkId.value())) {
return Error(
"Authenticated principal '" + stringify(principal) + "' does not "
"contain an 'fid' claim with the framework ID " +
stringify(frameworkId) + ", which is set in the call");
}
if (!(principal.claims.contains("eid") &&
principal.claims.at("eid") == executorId.value())) {
return Error(
"Authenticated principal '" + stringify(principal) + "' does not "
"contain an 'eid' claim with the executor ID " +
stringify(executorId) + ", which is set in the call");
}
if (!(principal.claims.contains("cid") &&
principal.claims.at("cid") == containerId.value())) {
return Error(
"Authenticated principal '" + stringify(principal) + "' does not "
"contain a 'cid' claim with the correct active ContainerID");
}
return None();
}
Future<Response> Http::executor(
const Request& request,
const Option<Principal>& principal) const
{
if (!slave->recoveryInfo.reconnect) {
CHECK_EQ(slave->state, Slave::RECOVERING);
return ServiceUnavailable("Agent has not finished recovery");
}
// TODO(anand): Add metrics for rejected requests.
if (request.method != "POST") {
return MethodNotAllowed({"POST"}, request.method);
}
v1::executor::Call v1Call;
Option<string> contentType = request.headers.get("Content-Type");
if (contentType.isNone()) {
return BadRequest("Expecting 'Content-Type' to be present");
}
if (contentType.get() == APPLICATION_PROTOBUF) {
if (!v1Call.ParseFromString(request.body)) {
return BadRequest("Failed to parse body into Call protobuf");
}
} else if (contentType.get() == APPLICATION_JSON) {
Try<JSON::Value> value = JSON::parse(request.body);
if (value.isError()) {
return BadRequest("Failed to parse body into JSON: " + value.error());
}
Try<v1::executor::Call> parse =
::protobuf::parse<v1::executor::Call>(value.get());
if (parse.isError()) {
return BadRequest("Failed to convert JSON into Call protobuf: " +
parse.error());
}
v1Call = parse.get();
} else {
return UnsupportedMediaType(
string("Expecting 'Content-Type' of ") +
APPLICATION_JSON + " or " + APPLICATION_PROTOBUF);
}
const executor::Call call = devolve(v1Call);
Option<Error> error = common::validation::validateExecutorCall(call);
if (error.isSome()) {
return BadRequest("Failed to validate Executor::Call: " + error->message);
}
ContentType acceptType;
if (call.type() == executor::Call::SUBSCRIBE) {
// We default to JSON since an empty 'Accept' header
// results in all media types considered acceptable.
if (request.acceptsMediaType(APPLICATION_JSON)) {
acceptType = ContentType::JSON;
} else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
acceptType = ContentType::PROTOBUF;
} else {
return NotAcceptable(
string("Expecting 'Accept' to allow ") +
"'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
}
} else if (call.type() == executor::Call::HEARTBEAT) {
// We return early here before doing any validation because currently
// this proto may contain dummy values for framework and executor IDs
// (which is safe).
// See: TODO inside `heartbeat()` in src/executor/executor.cpp.
return Accepted();
} else {
if (slave->state == Slave::RECOVERING) {
return ServiceUnavailable("Agent has not finished recovery");
}
}
// We consolidate the framework/executor lookup logic here because
// it is common for all the call handlers.
Framework* framework = slave->getFramework(call.framework_id());
if (framework == nullptr) {
return BadRequest("Framework cannot be found");
}
Executor* executor = framework->getExecutor(call.executor_id());
if (executor == nullptr) {
return BadRequest("Executor cannot be found");
}
// TODO(greggomann): Move this implicit executor authorization
// into the authorizer. See MESOS-7399.
if (principal.isSome()) {
error = verifyExecutorClaims(
principal.get(),
call.framework_id(),
call.executor_id(),
executor->containerId);
if (error.isSome()) {
return Forbidden(error->message);
}
}
if (executor->state == Executor::REGISTERING &&
call.type() != executor::Call::SUBSCRIBE) {
return Forbidden("Executor is not subscribed");
}
switch (call.type()) {
case executor::Call::SUBSCRIBE: {
Pipe pipe;
OK ok;
ok.headers["Content-Type"] = stringify(acceptType);
ok.type = Response::PIPE;
ok.reader = pipe.reader();
StreamingHttpConnection<v1::executor::Event> http(
pipe.writer(), acceptType);
slave->subscribe(http, call.subscribe(), framework, executor);
return ok;
}
case executor::Call::UPDATE: {
slave->statusUpdate(protobuf::createStatusUpdate(
call.framework_id(),
call.update().status(),
slave->info.id()),
None());
return Accepted();
}
case executor::Call::MESSAGE: {
slave->executorMessage(
slave->info.id(),
framework->id(),
executor->id,
call.message().data());
return Accepted();
}
case executor::Call::HEARTBEAT: {
// This should be handled before hitting this switch statement.
UNREACHABLE();
}
case executor::Call::UNKNOWN: {
LOG(WARNING) << "Received 'UNKNOWN' call";
return NotImplemented();
}
}
UNREACHABLE();
}
string Http::RESOURCE_PROVIDER_HELP() {
return HELP(
TLDR(
"Endpoint for the local resource provider HTTP API."),
DESCRIPTION(
"This endpoint is used by the local resource providers to interact",
"with the agent via Call/Event messages.",
"",
"Returns 200 OK iff the initial SUBSCRIBE Call is successful. This",
"will result in a streaming response via chunked transfer encoding.",
"The local resource providers can process the response incrementally.",
"",
"Returns 202 Accepted for all other Call messages iff the request is",
"accepted."),
AUTHENTICATION(true));
}
string Http::FLAGS_HELP()
{
return HELP(
TLDR("Exposes the agent's flag configuration."),
None(),
AUTHENTICATION(true),
AUTHORIZATION(
"The request principal should be authorized to view all flags.",
"See the authorization documentation for details."));
}
Future<Response> Http::flags(
const Request& request,
const Option<Principal>& principal) const
{
// TODO(nfnt): Remove check for enabled
// authorization as part of MESOS-5346.
if (request.method != "GET" && slave->authorizer.isSome()) {
return MethodNotAllowed({"GET"}, request.method);
}
if (slave->authorizer.isNone()) {
return OK(_flags(), request.url.query.get("jsonp"));
}
authorization::Request authRequest;
authRequest.set_action(authorization::VIEW_FLAGS);
Option<authorization::Subject> subject = createSubject(principal);
if (subject.isSome()) {
authRequest.mutable_subject()->CopyFrom(subject.get());
}
return slave->authorizer.get()->authorized(authRequest)
.then(defer(
slave->self(),
[this, request](bool authorized) -> Future<Response> {
if (authorized) {
return OK(_flags(), request.url.query.get("jsonp"));
} else {
return Forbidden();
}
}));
}
JSON::Object Http::_flags() const
{
JSON::Object object;
{
JSON::Object flags;
foreachvalue (const flags::Flag& flag, slave->flags) {
Option<string> value = flag.stringify(slave->flags);
if (value.isSome()) {
flags.values[flag.effective_name().value] = value.get();
}
}
object.values["flags"] = std::move(flags);
}
return object;
}
Future<Response> Http::getFlags(
const mesos::agent::Call& call,
ContentType acceptType,
const Option<Principal>& principal) const
{
CHECK_EQ(mesos::agent::Call::GET_FLAGS, call.type());
LOG(INFO) << "Processing GET_FLAGS call";
return ObjectApprovers::create(slave->authorizer, principal, {VIEW_FLAGS})
.then(defer(
slave->self(),
[this, acceptType](
const Owned<ObjectApprovers>& approvers) -> Response {
if (!approvers->approved<VIEW_FLAGS>()) {
return Forbidden();
}
return OK(
serialize(
acceptType, evolve<v1::agent::Response::GET_FLAGS>(_flags())),
stringify(acceptType));
}));
}
string Http::HEALTH_HELP()
{
return HELP(
TLDR(
"Health check of the Agent."),
DESCRIPTION(
"Returns 200 OK iff the Agent is healthy.",
"Delayed responses are also indicative of poor health."),
AUTHENTICATION(false));
}
Future<Response> Http::health(const Request& request) const
{
return OK();
}
Future<Response> Http::getHealth(
const mesos::agent::Call& call,
ContentType acceptType,
const Option<Principal>& principal) const
{
CHECK_EQ(mesos::agent::Call::GET_HEALTH, call.type());
LOG(INFO) << "Processing GET_HEALTH call";
mesos::agent::Response response;
response.set_type(mesos::agent::Response::GET_HEALTH);
response.mutable_get_health()->set_healthy(true);
return OK(serialize(acceptType, evolve(response)),
stringify(acceptType));
}
Future<Response> Http::getVersion(
const mesos::agent::Call& call,
ContentType acceptType,
const Option<Principal>& principal) const
{
CHECK_EQ(mesos::agent::Call::GET_VERSION, call.type());
LOG(INFO) << "Processing GET_VERSION call";
return OK(serialize(acceptType,
evolve<v1::agent::Response::GET_VERSION>(version())),
stringify(acceptType));
}
Future<Response> Http::getMetrics(
const mesos::agent::Call& call,
ContentType contentType,
const Option<Principal>& principal) const
{
CHECK_EQ(mesos::agent::Call::GET_METRICS, call.type());
CHECK(call.has_get_metrics());
LOG(INFO) << "Processing GET_METRICS call";
Option<Duration> timeout;
if (call.get_metrics().has_timeout()) {
timeout = Nanoseconds(call.get_metrics().timeout().nanoseconds());
}
return process::metrics::snapshot(timeout)
.then([contentType](const map<string, double>& metrics) -> Response {
// Serialize the following message:
//
// v1::agent::Response response;
// response.set_type(v1::agent::Response::GET_METRICS);
// v1::agent::Response::GetMetrics* getMetrics = ...;
switch (contentType) {
case ContentType::PROTOBUF: {
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
WireFormatLite::WriteEnum(
v1::agent::Response::kTypeFieldNumber,
v1::agent::Response::GET_METRICS,
&writer);
WireFormatLite::WriteBytes(
v1::agent::Response::kGetMetricsFieldNumber,
serializeGetMetrics<v1::agent::Response::GetMetrics>(metrics),
&writer);
// We must manually trim the unused buffer space since
// we use the string before the coded output stream is
// destructed.
writer.Trim();
return OK(std::move(output), stringify(contentType));
}
case ContentType::JSON: {
string body = jsonify([&](JSON::ObjectWriter* writer) {
const google::protobuf::Descriptor* descriptor =
v1::agent::Response::descriptor();
int field;
field = v1::agent::Response::kTypeFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
v1::agent::Response::Type_Name(
v1::agent::Response::GET_METRICS));
field = v1::agent::Response::kGetMetricsFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
jsonifyGetMetrics<v1::agent::Response::GetMetrics>(metrics));
});
// TODO(bmahler): Pass jsonp query parameter through here.
return OK(std::move(body), stringify(contentType));
}
default:
return NotAcceptable("Request must accept json or protobuf");
}
});
}
Future<Response> Http::getLoggingLevel(
const mesos::agent::Call& call,
ContentType acceptType,
const Option<Principal>& principal) const
{
CHECK_EQ(mesos::agent::Call::GET_LOGGING_LEVEL, call.type());
LOG(INFO) << "Processing GET_LOGGING_LEVEL call";
mesos::agent::Response response;
response.set_type(mesos::agent::Response::GET_LOGGING_LEVEL);
response.mutable_get_logging_level()->set_level(FLAGS_v);
return OK(serialize(acceptType, evolve(response)),
stringify(acceptType));
}
Future<Response> Http::setLoggingLevel(
const mesos::agent::Call& call,
ContentType /*contentType*/,
const Option<Principal>& principal) const
{
CHECK_EQ(mesos::agent::Call::SET_LOGGING_LEVEL, call.type());
CHECK(call.has_set_logging_level());
uint32_t level = call.set_logging_level().level();
Duration duration =
Nanoseconds(call.set_logging_level().duration().nanoseconds());
LOG(INFO) << "Processing SET_LOGGING_LEVEL call for level " << level;
return ObjectApprovers::create(slave->authorizer, principal, {SET_LOG_LEVEL})
.then([level, duration](
const Owned<ObjectApprovers>& approvers) -> Future<Response> {
if (!approvers->approved<SET_LOG_LEVEL>()) {
return Forbidden();
}
return dispatch(process::logging(), &Logging::set_level, level, duration)
.then([]() -> Response {
return OK();
});
});
}
Future<Response> Http::listFiles(
const mesos::agent::Call& call,
ContentType acceptType,
const Option<Principal>& principal) const
{
CHECK_EQ(mesos::agent::Call::LIST_FILES, call.type());
const string& path = call.list_files().path();
LOG(INFO) << "Processing LIST_FILES call for path '" << path << "'";
return slave->files->browse(path, principal)
.then([acceptType](const Try<list<FileInfo>, FilesError>& result)
-> Future<Response> {
if (result.isError()) {
const FilesError& error = result.error();
switch (error.type) {
case FilesError::Type::INVALID:
return BadRequest(error.message);
case FilesError::Type::UNAUTHORIZED:
return Forbidden(error.message);
case FilesError::Type::NOT_FOUND:
return NotFound(error.message);
case FilesError::Type::UNKNOWN:
return InternalServerError(error.message);
}
UNREACHABLE();
}
mesos::agent::Response response;
response.set_type(mesos::agent::Response::LIST_FILES);
mesos::agent::Response::ListFiles* listFiles =
response.mutable_list_files();
foreach (const FileInfo& fileInfo, result.get()) {
listFiles->add_file_infos()->CopyFrom(fileInfo);
}
return OK(serialize(acceptType, evolve(response)),
stringify(acceptType));
});
}
string Http::STATE_HELP() {
return HELP(
TLDR(
"Information about state of the Agent."),
DESCRIPTION(
"This endpoint shows information about the frameworks, executors",
"and the agent's master as a JSON object.",
"The information shown might be filtered based on the user",
"accessing the endpoint.",
"",
"Example (**Note**: this is not exhaustive):",
"",
"```",
"{",
" \"version\" : \"0.28.0\",",
" \"git_sha\" : \"9d5889b5a265849886a533965f4aefefd1fbd103\",",
" \"git_branch\" : \"refs/heads/master\",",
" \"git_tag\" : \"0.28.0\",",
" \"build_date\" : \"2016-02-15 10:00:28\"",
" \"build_time\" : 1455559228,",
" \"build_user\" : \"mesos-user\",",
" \"start_time\" : 1455647422.88396,",
" \"id\" : \"e2c38084-f6ea-496f-bce3-b6e07cea5e01-S0\",",
" \"pid\" : \"slave(1)@127.0.1.1:5051\",",
" \"hostname\" : \"localhost\",",
" \"resources\" : {",
" \"ports\" : \"[31000-32000]\",",
" \"mem\" : 127816,",
" \"disk\" : 804211,",
" \"cpus\" : 32",
" },",
" \"attributes\" : {},",
" \"master_hostname\" : \"localhost\",",
" \"log_dir\" : \"/var/log\",",
" \"external_log_file\" : \"mesos.log\",",
" \"frameworks\" : [],",
" \"completed_frameworks\" : [],",
" \"flags\" : {",
" \"gc_disk_headroom\" : \"0.1\",",
" \"isolation\" : \"posix/cpu,posix/mem\",",
" \"containerizers\" : \"mesos\",",
" \"docker_socket\" : \"/var/run/docker.sock\",",
" \"gc_delay\" : \"1weeks\",",
" \"gc_non_executor_container_sandboxes\" : \"false\",",
" \"docker_remove_delay\" : \"6hrs\",",
" \"port\" : \"5051\",",
" \"systemd_runtime_directory\" : \"/run/systemd/system\",",
" \"initialize_driver_logging\" : \"true\",",
" \"cgroups_root\" : \"mesos\",",
" \"fetcher_cache_size\" : \"2GB\",",
" \"cgroups_hierarchy\" : \"/sys/fs/cgroup\",",
" \"qos_correction_interval_min\" : \"0ns\",",
" \"cgroups_cpu_enable_pids_and_tids_count\" : \"false\",",
" \"sandbox_directory\" : \"/mnt/mesos/sandbox\",",
" \"docker\" : \"docker\",",
" \"help\" : \"false\",",
" \"docker_stop_timeout\" : \"0ns\",",
" \"master\" : \"127.0.0.1:5050\",",
" \"logbufsecs\" : \"0\",",
" \"docker_registry\" : \"https://registry-1.docker.io\",",
" \"frameworks_home\" : \"\",",
" \"cgroups_enable_cfs\" : \"false\",",
" \"perf_interval\" : \"1mins\",",
" \"docker_kill_orphans\" : \"true\",",
" \"switch_user\" : \"true\",",
" \"logging_level\" : \"INFO\",",
" \"strict\" : \"true\",",
" \"executor_registration_timeout\" : \"1mins\",",
" \"recovery_timeout\" : \"15mins\",",
" \"revocable_cpu_low_priority\" : \"true\",",
" \"docker_store_dir\" : \"/tmp/mesos/store/docker\",",
" \"image_provisioner_backend\" : \"copy\",",
" \"authenticatee\" : \"crammd5\",",
" \"quiet\" : \"false\",",
" \"executor_shutdown_grace_period\" : \"5secs\",",
" \"fetcher_cache_dir\" : \"/tmp/mesos/fetch\",",
" \"default_role\" : \"*\",",
" \"work_dir\" : \"/tmp/mesos\",",
" \"launcher_dir\" : \"/path/to/mesos/build/src\",",
" \"registration_backoff_factor\" : \"1secs\",",
" \"oversubscribed_resources_interval\" : \"15secs\",",
" \"enforce_container_disk_quota\" : \"false\",",
" \"container_disk_watch_interval\" : \"15secs\",",
" \"disk_watch_interval\" : \"1mins\",",
" \"cgroups_limit_swap\" : \"false\",",
" \"hostname_lookup\" : \"true\",",
" \"perf_duration\" : \"10secs\",",
" \"appc_store_dir\" : \"/tmp/mesos/store/appc\",",
" \"recover\" : \"reconnect\",",
" \"version\" : \"false\"",
" },",
"}",
"```"),
AUTHENTICATION(true),
AUTHORIZATION(
"This endpoint might be filtered based on the user accessing it.",
"For example a user might only see the subset of frameworks,",
"tasks, and executors they are allowed to view.",
"See the authorization documentation for details."));
}
Future<Response> Http::state(
const Request& request,
const Option<Principal>& principal) const
{
if (slave->state == Slave::RECOVERING) {
return ServiceUnavailable("Agent has not finished recovery");
}
return ObjectApprovers::create(
slave->authorizer,
principal,
{VIEW_FRAMEWORK,
VIEW_TASK,
VIEW_EXECUTOR,
VIEW_FLAGS,
VIEW_ROLE,
VIEW_RESOURCE_PROVIDER})
.then(defer(
slave->self(),
[this, request](const Owned<ObjectApprovers>& approvers) -> Response {
// This lambda is consumed before the outer lambda
// returns, hence capture by reference is fine here.
auto state = [this, &approvers](JSON::ObjectWriter* writer) {
writer->field("version", MESOS_VERSION);
if (build::GIT_SHA.isSome()) {
writer->field("git_sha", build::GIT_SHA.get());
}
if (build::GIT_BRANCH.isSome()) {
writer->field("git_branch", build::GIT_BRANCH.get());
}
if (build::GIT_TAG.isSome()) {
writer->field("git_tag", build::GIT_TAG.get());
}
writer->field("build_date", build::DATE);
writer->field("build_time", build::TIME);
writer->field("build_user", build::USER);
writer->field("start_time", slave->startTime.secs());
writer->field("id", slave->info.id().value());
writer->field("pid", string(slave->self()));
writer->field("hostname", slave->info.hostname());
writer->field(
"capabilities",
slave->capabilities.toRepeatedPtrField());
if (slave->info.has_domain()) {
writer->field("domain", slave->info.domain());
}
if (slave->drainConfig.isSome()) {
writer->field(
"drain_config",
JSON::Protobuf(slave->drainConfig.get()));
if (slave->estimatedDrainStartTime.isSome()) {
writer->field(
"estimated_drain_start_time_seconds",
static_cast<int64_t>(
slave->estimatedDrainStartTime->secs()));
}
}
const Resources& totalResources = slave->totalResources;
writer->field("resources", totalResources);
writer->field(
"reserved_resources",
[&totalResources, &approvers](JSON::ObjectWriter* writer) {
foreachpair (const string& role,
const Resources& resources,
totalResources.reservations()) {
if (approvers->approved<VIEW_ROLE>(role)) {
writer->field(role, resources);
}
}
});
writer->field("unreserved_resources", totalResources.unreserved());
writer->field(
"reserved_resources_full",
[&totalResources, &approvers](JSON::ObjectWriter* writer) {
foreachpair (const string& role,
const Resources& resources,
totalResources.reservations()) {
if (approvers->approved<VIEW_ROLE>(role)) {
writer->field(
role,
[&resources](JSON::ArrayWriter* writer) {
foreach (Resource resource, resources) {
convertResourceFormat(&resource, ENDPOINT);
writer->element(JSON::Protobuf(resource));
}
});
}
}
});
writer->field(
"unreserved_resources_full",
[&totalResources](JSON::ArrayWriter* writer) {
foreach (Resource resource, totalResources.unreserved()) {
convertResourceFormat(&resource, ENDPOINT);
writer->element(JSON::Protobuf(resource));
}
});
// TODO(abudnik): Consider storing the allocatedResources in the
// Slave struct rather than computing it here each time.
Resources allocatedResources;
foreachvalue (const Framework* framework, slave->frameworks) {
allocatedResources += framework->allocatedResources();
}
writer->field(
"reserved_resources_allocated",
[&allocatedResources, &approvers](JSON::ObjectWriter* writer) {
foreachpair (const string& role,
const Resources& resources,
allocatedResources.reservations()) {
if (approvers->approved<VIEW_ROLE>(role)) {
writer->field(role, resources);
}
}
});
writer->field(
"unreserved_resources_allocated",
allocatedResources.unreserved());
writer->field(
"resource_providers",
[this, &approvers](JSON::ArrayWriter* writer) {
if (!approvers->approved<VIEW_RESOURCE_PROVIDER>()) {
return;
}
foreachvalue (
ResourceProvider* resourceProvider,
slave->resourceProviders) {
agent::Response::GetResourceProviders::ResourceProvider
provider;
*provider.mutable_resource_provider_info() =
resourceProvider->info;
*provider.mutable_total_resources() =
resourceProvider->totalResources;
writer->element(JSON::Protobuf(provider));
}
});
writer->field("attributes", Attributes(slave->info.attributes()));
if (slave->master.isSome()) {
Try<string> hostname =
net::getHostname(slave->master->address.ip);
if (hostname.isSome()) {
writer->field("master_hostname", hostname.get());
}
}
if (approvers->approved<VIEW_FLAGS>()) {
if (slave->flags.log_dir.isSome()) {
writer->field("log_dir", slave->flags.log_dir.get());
}
if (slave->flags.external_log_file.isSome()) {
writer->field(
"external_log_file", slave->flags.external_log_file.get());
}
writer->field("flags", [this](JSON::ObjectWriter* writer) {
foreachvalue (const flags::Flag& flag, slave->flags) {
Option<string> value = flag.stringify(slave->flags);
if (value.isSome()) {
writer->field(flag.effective_name().value, value.get());
}
}
});
}
// Model all of the frameworks.
writer->field(
"frameworks",
[this, &approvers](JSON::ArrayWriter* writer) {
foreachvalue (Framework* framework, slave->frameworks) {
// Skip unauthorized frameworks.
if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
continue;
}
writer->element(FrameworkWriter(approvers, framework));
}
});
// Model all of the completed frameworks.
writer->field(
"completed_frameworks",
[this, &approvers](JSON::ArrayWriter* writer) {
foreachvalue (const Owned<Framework>& framework,
slave->completedFrameworks) {
// Skip unauthorized frameworks.
if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
continue;
}
writer->element(
FrameworkWriter(approvers, framework.get()));
}
});
};
return OK(jsonify(state), request.url.query.get("jsonp"));
}));
}
Future<Response> Http::getFrameworks(
const mesos::agent::Call& call,
ContentType contentType,
const Option<Principal>& principal) const
{
CHECK_EQ(mesos::agent::Call::GET_FRAMEWORKS, call.type());
LOG(INFO) << "Processing GET_FRAMEWORKS call";
return ObjectApprovers::create(slave->authorizer, principal, {VIEW_FRAMEWORK})
.then(defer(
slave->self(),
[this, contentType](
const Owned<ObjectApprovers>& approvers) -> Response {
// Serialize the following message:
//
// v1::agent::Response response;
// response.set_type(mesos::agent::Response::GET_FRAMEWORKS);
// *response.mutable_get_frameworks() = _...;
switch (contentType) {
case ContentType::PROTOBUF: {
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
WireFormatLite::WriteEnum(
v1::agent::Response::kTypeFieldNumber,
v1::agent::Response::GET_FRAMEWORKS,
&writer);
WireFormatLite::WriteBytes(
v1::agent::Response::kGetFrameworksFieldNumber,
serializeGetFrameworks(approvers),
&writer);
// We must manually trim the unused buffer space since
// we use the string before the coded output stream is
// destructed.
writer.Trim();
return OK(std::move(output), stringify(contentType));
}
case ContentType::JSON: {
string body = jsonify([&](JSON::ObjectWriter* writer) {
const google::protobuf::Descriptor* descriptor =
v1::agent::Response::descriptor();
int field;
field = v1::agent::Response::kTypeFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
v1::agent::Response::Type_Name(
v1::agent::Response::GET_FRAMEWORKS));
field = v1::agent::Response::kGetFrameworksFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
jsonifyGetFrameworks(approvers));
});
// TODO(bmahler): Pass jsonp query parameter through here.
return OK(std::move(body), stringify(contentType));
}
default:
return NotAcceptable("Request must accept json or protobuf");
}
}));
}
function<void(JSON::ObjectWriter*)> Http::jsonifyGetFrameworks(
const Owned<ObjectApprovers>& approvers) const
{
// Serialize the following message:
//
// v1::agent::Response::GetFrameworks getFrameworks;
//
// for each framework:
// *getFrameworks.add_frameworks()
// ->mutable_framework_info() = ...;
//
// for each completed framework:
// *getFrameworks.add_completed_frameworks()
// ->mutable_framework_info() = ...;
// Lambda for jsonifying the following message:
//
// v1::agent::Response::GetFrameworks::Framework framework;
// *framework.mutable_framework_info() = frameworkInfo;
auto jsonifyGetFramework = [](const FrameworkInfo& f) {
return [&](JSON::ObjectWriter* writer) {
const google::protobuf::Descriptor* descriptor =
v1::agent::Response::GetFrameworks::Framework::descriptor();
int field = v1::agent::Response::GetFrameworks::Framework
::kFrameworkInfoFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
asV1Protobuf(f));
};
};
// TODO(bmahler): This copies the owned object approvers.
return [=](JSON::ObjectWriter* writer) {
const google::protobuf::Descriptor* descriptor =
v1::agent::Response::GetFrameworks::descriptor();
int field;
field = v1::agent::Response::GetFrameworks::kFrameworksFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
[&](JSON::ArrayWriter* writer) {
foreachvalue (const Framework* f, slave->frameworks) {
if (approvers->approved<VIEW_FRAMEWORK>(f->info)) {
writer->element(jsonifyGetFramework(f->info));
}
}
});
field = v1::agent::Response::GetFrameworks::kCompletedFrameworksFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
[&](JSON::ArrayWriter* writer) {
foreachvalue (const Owned<Framework>& f, slave->completedFrameworks) {
if (approvers->approved<VIEW_FRAMEWORK>(f->info)) {
writer->element(jsonifyGetFramework(f->info));
}
}
});
};
}
string Http::serializeGetFrameworks(
const Owned<ObjectApprovers>& approvers) const
{
// Serialize the following message:
//
// v1::agent::Response::GetFrameworks getFrameworks;
//
// for each framework:
// *getFrameworks.add_frameworks()
// ->mutable_framework_info() = ...;
//
// for each completed framework:
// *getFrameworks.add_completed_frameworks()
// ->mutable_framework_info() = ...;
// Lambda for serializing the following message:
//
// v1::agent::Response::GetFrameworks::Framework framework;
// *framework.mutable_framework_info() = frameworkInfo;
auto serializeFramework = [](const FrameworkInfo& f) {
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
WireFormatLite2::WriteMessageWithoutCachedSizes(
v1::agent::Response::GetFrameworks::Framework
::kFrameworkInfoFieldNumber,
f,
&writer);
// While an explicit Trim() isn't necessary (since the coded
// output stream is destructed before the string is returned),
// it's a quite tricky bug to diagnose if Trim() is missed, so
// we always do it explicitly to signal the reader about this
// subtlety.
writer.Trim();
return output;
};
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
foreachvalue (const Framework* f, slave->frameworks) {
if (approvers->approved<VIEW_FRAMEWORK>(f->info)) {
WireFormatLite::WriteBytes(
v1::agent::Response::GetFrameworks::kFrameworksFieldNumber,
serializeFramework(f->info),
&writer);
}
}
foreachvalue (const Owned<Framework>& f, slave->completedFrameworks) {
if (approvers->approved<VIEW_FRAMEWORK>(f->info)) {
WireFormatLite::WriteBytes(
v1::agent::Response::GetFrameworks::kCompletedFrameworksFieldNumber,
serializeFramework(f->info),
&writer);
}
}
// While an explicit Trim() isn't necessary (since the coded
// output stream is destructed before the string is returned),
// it's a quite tricky bug to diagnose if Trim() is missed, so
// we always do it explicitly to signal the reader about this
// subtlety.
writer.Trim();
return output;
}
Future<Response> Http::getExecutors(
const mesos::agent::Call& call,
ContentType contentType,
const Option<Principal>& principal) const
{
CHECK_EQ(mesos::agent::Call::GET_EXECUTORS, call.type());
LOG(INFO) << "Processing GET_EXECUTORS call";
return ObjectApprovers::create(
slave->authorizer,
principal,
{VIEW_FRAMEWORK, VIEW_EXECUTOR})
.then(defer(
slave->self(),
[this, contentType](
const Owned<ObjectApprovers>& approvers) -> Response {
// Serialize the following message:
//
// v1::agent::Response response;
// response.set_type(mesos::agent::Response::GET_EXECUTORS);
// *response.mutable_get_executors() = _...;
switch (contentType) {
case ContentType::PROTOBUF: {
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
WireFormatLite::WriteEnum(
v1::agent::Response::kTypeFieldNumber,
v1::agent::Response::GET_EXECUTORS,
&writer);
WireFormatLite::WriteBytes(
v1::agent::Response::kGetExecutorsFieldNumber,
serializeGetExecutors(approvers),
&writer);
// We must manually trim the unused buffer space since
// we use the string before the coded output stream is
// destructed.
writer.Trim();
return OK(std::move(output), stringify(contentType));
}
case ContentType::JSON: {
string body = jsonify([&](JSON::ObjectWriter* writer) {
const google::protobuf::Descriptor* descriptor =
v1::agent::Response::descriptor();
int field;
field = v1::agent::Response::kTypeFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
v1::agent::Response::Type_Name(
v1::agent::Response::GET_EXECUTORS));
field = v1::agent::Response::kGetExecutorsFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
jsonifyGetExecutors(approvers));
});
// TODO(bmahler): Pass jsonp query parameter through here.
return OK(std::move(body), stringify(contentType));
}
default:
return NotAcceptable("Request must accept json or protobuf");
}
}));
}
function<void(JSON::ObjectWriter*)> Http::jsonifyGetExecutors(
const Owned<ObjectApprovers>& approvers) const
{
return [=](JSON::ObjectWriter* writer) {
// Construct framework list with both active and completed frameworks.
vector<const Framework*> frameworks;
foreachvalue (const Framework* f, slave->frameworks) {
if (approvers->approved<VIEW_FRAMEWORK>(f->info)) {
frameworks.push_back(f);
}
}
foreachvalue (const Owned<Framework>& f, slave->completedFrameworks) {
if (approvers->approved<VIEW_FRAMEWORK>(f->info)) {
frameworks.push_back(f.get());
}
}
// Lambda for jsonifying the following message:
//
// v1::agent::Response::GetExecutors::Executor executor;
// *executor.mutable_executor_info() = executorInfo;
auto jsonifyGetExecutor = [](const ExecutorInfo& e) {
return [&](JSON::ObjectWriter* writer) {
const google::protobuf::Descriptor* descriptor =
v1::agent::Response::GetExecutors::Executor::descriptor();
int field;
field = v1::agent::Response::GetExecutors::Executor
::kExecutorInfoFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
asV1Protobuf(e));
};
};
// Jsonify the following message:
//
// v1::agent::Response::GetExecutors getExecutors;
// for each executor:
// *getExecutors.add_executors() = executor;
// for each completed executor:
// *getExecutors.add_completed_executors() = completed executor;
const google::protobuf::Descriptor* descriptor =
v1::agent::Response::GetExecutors::descriptor();
int field;
field = v1::agent::Response::GetExecutors::kExecutorsFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
[&](JSON::ArrayWriter* writer) {
foreach (const Framework* f, frameworks) {
foreachvalue (const Executor* e, f->executors) {
if (approvers->approved<VIEW_EXECUTOR>(e->info, f->info)) {
writer->element(jsonifyGetExecutor(e->info));
}
}
}
});
field = v1::agent::Response::GetExecutors::kCompletedExecutorsFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
[&](JSON::ArrayWriter* writer) {
foreach (const Framework* f, frameworks) {
foreach (const Owned<Executor>& e, f->completedExecutors) {
if (approvers->approved<VIEW_EXECUTOR>(e->info, f->info)) {
writer->element(jsonifyGetExecutor(e->info));
}
}
}
});
};
}
string Http::serializeGetExecutors(
const Owned<ObjectApprovers>& approvers) const
{
// Construct framework list with both active and completed frameworks.
vector<const Framework*> frameworks;
foreachvalue (Framework* f, slave->frameworks) {
if (approvers->approved<VIEW_FRAMEWORK>(f->info)) {
frameworks.push_back(f);
}
}
foreachvalue (const Owned<Framework>& f, slave->completedFrameworks) {
if (approvers->approved<VIEW_FRAMEWORK>(f->info)) {
frameworks.push_back(f.get());
}
}
// Lambda for serializing the following message:
//
// v1::agent::Response::GetExecutors::Executor executor;
// *executor.mutable_executor_info() = executorInfo;
auto serializeExecutor = [](const ExecutorInfo& e) {
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
WireFormatLite2::WriteMessageWithoutCachedSizes(
v1::agent::Response::GetExecutors::Executor::kExecutorInfoFieldNumber,
e,
&writer);
// While an explicit Trim() isn't necessary (since the coded
// output stream is destructed before the string is returned),
// it's a quite tricky bug to diagnose if Trim() is missed, so
// we always do it explicitly to signal the reader about this
// subtlety.
writer.Trim();
return output;
};
// Serialize the following message:
//
// v1::agent::Response::GetExecutors getExecutors;
// for each executor:
// *getExecutors.add_executors() = executor;
// for each completed executor:
// *getExecutors.add_completed_executors() = completed executor;
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
foreach (const Framework* framework, frameworks) {
foreachvalue (Executor* executor, framework->executors) {
if (approvers->approved<VIEW_EXECUTOR>(executor->info, framework->info)) {
WireFormatLite::WriteBytes(
v1::agent::Response::GetExecutors::kExecutorsFieldNumber,
serializeExecutor(executor->info),
&writer);
}
}
foreach (const Owned<Executor>& executor, framework->completedExecutors) {
if (approvers->approved<VIEW_EXECUTOR>(executor->info, framework->info)) {
WireFormatLite::WriteBytes(
v1::agent::Response::GetExecutors::kCompletedExecutorsFieldNumber,
serializeExecutor(executor->info),
&writer);
}
}
}
// While an explicit Trim() isn't necessary (since the coded
// output stream is destructed before the string is returned),
// it's a quite tricky bug to diagnose if Trim() is missed, so
// we always do it explicitly to signal the reader about this
// subtlety.
writer.Trim();
return output;
}
Future<Response> Http::getOperations(
const mesos::agent::Call& call,
ContentType acceptType,
const Option<Principal>& principal) const
{
CHECK_EQ(mesos::agent::Call::GET_OPERATIONS, call.type());
LOG(INFO) << "Processing GET_OPERATIONS call";
return ObjectApprovers::create(slave->authorizer, principal, {VIEW_ROLE})
.then(defer(
slave->self(),
[=](const Owned<ObjectApprovers>& approvers) -> Response {
// We consider a principal to be authorized to view an operation if it
// is authorized to view the resources the operation is performed on.
auto approved = [&approvers](const Operation& operation) {
Try<Resources> consumedResources =
protobuf::getConsumedResources(operation.info());
if (consumedResources.isError()) {
LOG(WARNING)
<< "Could not approve operation " << operation.uuid()
<< " since its consumed resources could not be determined: "
<< consumedResources.error();
return false;
}
foreach (const Resource& resource, consumedResources.get()) {
if (!approvers->approved<VIEW_ROLE>(resource)) {
return false;
}
}
return true;
};
agent::Response response;
response.set_type(mesos::agent::Response::GET_OPERATIONS);
agent::Response::GetOperations* operations =
response.mutable_get_operations();
foreachvalue (Operation* operation, slave->operations) {
if (approved(*operation)) {
operations->add_operations()->CopyFrom(*operation);
}
}
return OK(
serialize(acceptType, evolve(response)), stringify(acceptType));
}));
}
Future<Response> Http::getTasks(
const mesos::agent::Call& call,
ContentType contentType,
const Option<Principal>& principal) const
{
CHECK_EQ(mesos::agent::Call::GET_TASKS, call.type());
LOG(INFO) << "Processing GET_TASKS call";
return ObjectApprovers::create(
slave->authorizer,
principal,
{VIEW_FRAMEWORK, VIEW_TASK, VIEW_EXECUTOR})
.then(defer(
slave->self(),
[this, contentType](
const Owned<ObjectApprovers>& approvers) -> Response {
// Serialize the following message:
//
// v1::agent::Response response;
// response.set_type(mesos::agent::Response::GET_TASKS);
// *response.mutable_get_tasks() = _...;
switch (contentType) {
case ContentType::PROTOBUF: {
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
WireFormatLite::WriteEnum(
v1::agent::Response::kTypeFieldNumber,
v1::agent::Response::GET_TASKS,
&writer);
WireFormatLite::WriteBytes(
v1::agent::Response::kGetTasksFieldNumber,
serializeGetTasks(approvers),
&writer);
// We must manually trim the unused buffer space since
// we use the string before the coded output stream is
// destructed.
writer.Trim();
return OK(std::move(output), stringify(contentType));
}
case ContentType::JSON: {
string body = jsonify([&](JSON::ObjectWriter* writer) {
const google::protobuf::Descriptor* descriptor =
v1::agent::Response::descriptor();
int field;
field = v1::agent::Response::kTypeFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
v1::agent::Response::Type_Name(
v1::agent::Response::GET_TASKS));
field = v1::agent::Response::kGetTasksFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
jsonifyGetTasks(approvers));
});
// TODO(bmahler): Pass jsonp query parameter through here.
return OK(std::move(body), stringify(contentType));
}
default:
return NotAcceptable("Request must accept json or protobuf");
}
}));
}
function<void(JSON::ObjectWriter*)> Http::jsonifyGetTasks(
const Owned<ObjectApprovers>& approvers) const
{
return [=](JSON::ObjectWriter* writer) {
// Construct framework list with both active and completed frameworks.
vector<const Framework*> frameworks;
foreachvalue (Framework* f, slave->frameworks) {
if (approvers->approved<VIEW_FRAMEWORK>(f->info)) {
frameworks.push_back(f);
}
}
foreachvalue (const Owned<Framework>& f, slave->completedFrameworks) {
if (approvers->approved<VIEW_FRAMEWORK>(f->info)) {
frameworks.push_back(f.get());
}
}
// Construct executor list with both active and completed executors.
hashmap<const Executor*, const Framework*> executors;
foreach (const Framework* f, frameworks) {
foreachvalue (const Executor* e, f->executors) {
if (approvers->approved<VIEW_EXECUTOR>(e->info, f->info)) {
executors.put(e, f);
}
}
foreach (const Owned<Executor>& e, f->completedExecutors) {
if (approvers->approved<VIEW_EXECUTOR>(e->info, f->info)) {
executors.put(e.get(), f);
}
}
}
// Jsonify the following message:
//
// v1::agent::Response::GetTasks getTasks;
//
// for each pending task:
// *getTasks.add_pending_tasks() = task
// for each queued task:
// *getTasks.add_queued_tasks() = *task;
// for each launched task:
// *getTasks.add_launched_tasks() = *task;
// for each terminated task:
// *getTasks.add_terminated_tasks() = *task;
// for each completed task:
// *getTasks.add_completed_tasks() = *task;
const google::protobuf::Descriptor* descriptor =
v1::agent::Response::GetTasks::descriptor();
int field;
// Pending tasks.
field = v1::agent::Response::GetTasks::kPendingTasksFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
[&](JSON::ArrayWriter* writer) {
foreach (const Framework* framework, frameworks) {
typedef hashmap<TaskID, TaskInfo> TaskMap;
foreachvalue (const TaskMap& taskInfos, framework->pendingTasks) {
foreachvalue (const TaskInfo& t, taskInfos) {
if (approvers->approved<VIEW_TASK>(t, framework->info)) {
// TODO(bmahler): Consider not constructing the temporary task
// object and instead jsonify directly. Since we don't
// expect a large number of pending tasks, we currently don't
// bother with the more efficient approach.
Task task =
protobuf::createTask(t, TASK_STAGING, framework->id());
writer->element(asV1Protobuf(task));
}
}
}
}
});
// Queued tasks.
field = v1::agent::Response::GetTasks::kQueuedTasksFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
[&](JSON::ArrayWriter* writer) {
foreachpair (const Executor* executor,
const Framework* framework,
executors) {
foreachvalue (const TaskInfo& taskInfo, executor->queuedTasks) {
if (approvers->approved<VIEW_TASK>(taskInfo, framework->info)) {
// TODO(bmahler): Consider not constructing the temporary task
// object and instead serialize directly. Since we don't expect
// a large number of pending tasks, we currently don't bother
// with the more efficient approach.
Task t =
protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
writer->element(asV1Protobuf(t));
}
}
}
});
// Launched tasks.
field = v1::agent::Response::GetTasks::kLaunchedTasksFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
[&](JSON::ArrayWriter* writer) {
foreachpair (const Executor* executor,
const Framework* framework,
executors) {
foreachvalue (Task* task, executor->launchedTasks) {
if (approvers->approved<VIEW_TASK>(*task, framework->info)) {
writer->element(asV1Protobuf(*task));
}
}
}
});
// Terminated tasks.
field = v1::agent::Response::GetTasks::kTerminatedTasksFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
[&](JSON::ArrayWriter* writer) {
foreachpair (const Executor* executor,
const Framework* framework,
executors) {
foreachvalue (Task* task, executor->terminatedTasks) {
if (approvers->approved<VIEW_TASK>(*task, framework->info)) {
writer->element(asV1Protobuf(*task));
}
}
}
});
// Completed tasks.
field = v1::agent::Response::GetTasks::kCompletedTasksFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
[&](JSON::ArrayWriter* writer) {
foreachpair (const Executor* executor,
const Framework* framework,
executors) {
foreach (const std::shared_ptr<Task>& t, executor->completedTasks) {
if (approvers->approved<VIEW_TASK>(*t.get(), framework->info)) {
writer->element(asV1Protobuf(*t));
}
}
}
});
};
}
string Http::serializeGetTasks(
const Owned<ObjectApprovers>& approvers) const
{
// Construct framework list with both active and completed frameworks.
vector<const Framework*> frameworks;
foreachvalue (Framework* f, slave->frameworks) {
if (approvers->approved<VIEW_FRAMEWORK>(f->info)) {
frameworks.push_back(f);
}
}
foreachvalue (const Owned<Framework>& f, slave->completedFrameworks) {
if (approvers->approved<VIEW_FRAMEWORK>(f->info)) {
frameworks.push_back(f.get());
}
}
// Construct executor list with both active and completed executors.
hashmap<const Executor*, const Framework*> executors;
foreach (const Framework* f, frameworks) {
foreachvalue (Executor* e, f->executors) {
if (approvers->approved<VIEW_EXECUTOR>(e->info, f->info)) {
executors.put(e, f);
}
}
foreach (const Owned<Executor>& e, f->completedExecutors) {
if (approvers->approved<VIEW_EXECUTOR>(e->info, f->info)) {
executors.put(e.get(), f);
}
}
}
// Serialize the following message:
//
// v1::agent::Response::GetTasks getTasks;
//
// for each pending task:
// *getTasks.add_pending_tasks() = task
// for each queued task:
// *getTasks.add_queued_tasks() = *task;
// for each launched task:
// *getTasks.add_launched_tasks() = *task;
// for each terminated task:
// *getTasks.add_terminated_tasks() = *task;
// for each completed task:
// *getTasks.add_completed_tasks() = *task;
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
foreach (const Framework* framework, frameworks) {
// Pending tasks.
typedef hashmap<TaskID, TaskInfo> TaskMap;
foreachvalue (const TaskMap& taskInfos, framework->pendingTasks) {
foreachvalue (const TaskInfo& taskInfo, taskInfos) {
if (approvers->approved<VIEW_TASK>(taskInfo, framework->info)) {
// TODO(bmahler): Consider not constructing the temporary task
// object and instead serialize directly. Since we don't expect
// a large number of pending tasks, we currently don't bother
// with the more efficient approach.
WireFormatLite2::WriteMessageWithoutCachedSizes(
v1::agent::Response::GetTasks::kPendingTasksFieldNumber,
protobuf::createTask(taskInfo, TASK_STAGING, framework->id()),
&writer);
}
}
}
}
foreachpair (const Executor* executor,
const Framework* framework,
executors) {
// Queued tasks.
foreachvalue (const TaskInfo& taskInfo, executor->queuedTasks) {
if (approvers->approved<VIEW_TASK>(taskInfo, framework->info)) {
// TODO(bmahler): Consider not constructing the temporary task
// object and instead serialize directly. Since we don't expect
// a large number of pending tasks, we currently don't bother
// with the more efficient approach.
WireFormatLite2::WriteMessageWithoutCachedSizes(
v1::agent::Response::GetTasks::kQueuedTasksFieldNumber,
protobuf::createTask(taskInfo, TASK_STAGING, framework->id()),
&writer);
}
}
// Launched tasks.
foreachvalue (Task* task, executor->launchedTasks) {
if (approvers->approved<VIEW_TASK>(*task, framework->info)) {
WireFormatLite2::WriteMessageWithoutCachedSizes(
v1::agent::Response::GetTasks::kLaunchedTasksFieldNumber,
*task,
&writer);
}
}
// Terminated tasks.
foreachvalue (Task* task, executor->terminatedTasks) {
if (approvers->approved<VIEW_TASK>(*task, framework->info)) {
WireFormatLite2::WriteMessageWithoutCachedSizes(
v1::agent::Response::GetTasks::kTerminatedTasksFieldNumber,
*task,
&writer);
}
}
// Completed tasks.
foreach (const std::shared_ptr<Task>& task, executor->completedTasks) {
if (approvers->approved<VIEW_TASK>(*task.get(), framework->info)) {
WireFormatLite2::WriteMessageWithoutCachedSizes(
v1::agent::Response::GetTasks::kCompletedTasksFieldNumber,
*task,
&writer);
}
}
}
// While an explicit Trim() isn't necessary (since the coded
// output stream is destructed before the string is returned),
// it's a quite tricky bug to diagnose if Trim() is missed, so
// we always do it explicitly to signal the reader about this
// subtlety.
writer.Trim();
return output;
}
Future<Response> Http::getAgent(
const mesos::agent::Call& call,
ContentType acceptType,
const Option<Principal>& principal) const
{
CHECK_EQ(mesos::agent::Call::GET_AGENT, call.type());
LOG(INFO) << "Processing GET_AGENT call";
agent::Response response;
response.set_type(mesos::agent::Response::GET_AGENT);
response.mutable_get_agent()->mutable_slave_info()->CopyFrom(slave->info);
if (slave->drainConfig.isSome()) {
response.mutable_get_agent()->mutable_drain_config()->CopyFrom(
slave->drainConfig.get());
if (slave->estimatedDrainStartTime.isSome()) {
response.mutable_get_agent()
->mutable_estimated_drain_start_time()
->set_nanoseconds(Seconds(slave->estimatedDrainStartTime->secs()).ns());
}
}
return OK(serialize(acceptType, evolve(response)),
stringify(acceptType));
}
Future<Response> Http::getResourceProviders(
const mesos::agent::Call& call,
ContentType acceptType,
const Option<Principal>& principal) const
{
CHECK_EQ(mesos::agent::Call::GET_RESOURCE_PROVIDERS, call.type());
LOG(INFO) << "Processing GET_RESOURCE_PROVIDERS call";
return ObjectApprovers::create(
slave->authorizer, principal, {VIEW_RESOURCE_PROVIDER})
.then(defer(
slave->self(),
[this, acceptType](
const Owned<ObjectApprovers>& approvers) -> Response {
agent::Response response;
response.set_type(mesos::agent::Response::GET_RESOURCE_PROVIDERS);
agent::Response::GetResourceProviders* resourceProviders =
response.mutable_get_resource_providers();
foreachvalue (
ResourceProvider* resourceProvider, slave->resourceProviders) {
if (!approvers->approved<VIEW_RESOURCE_PROVIDER>()) {
continue;
}
agent::Response::GetResourceProviders::ResourceProvider* provider =
resourceProviders->add_resource_providers();
provider->mutable_resource_provider_info()->CopyFrom(
resourceProvider->info);
provider->mutable_total_resources()->CopyFrom(
resourceProvider->totalResources);
}
return OK(
serialize(acceptType, evolve(response)), stringify(acceptType));
}));
}
Future<Response> Http::getState(
const mesos::agent::Call& call,
ContentType contentType,
const Option<Principal>& principal) const
{
CHECK_EQ(mesos::agent::Call::GET_STATE, call.type());
LOG(INFO) << "Processing GET_STATE call";
return ObjectApprovers::create(
slave->authorizer,
principal,
{VIEW_FRAMEWORK, VIEW_TASK, VIEW_EXECUTOR})
.then(defer(
slave->self(),
[=](const Owned<ObjectApprovers>& approvers) -> Response {
// Serialize the following message:
//
// v1::agent::Response response;
// response.set_type(mesos::agent::Response::GET_STATE);
// *response.mutable_get_state() = _...;
switch (contentType) {
case ContentType::PROTOBUF: {
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
WireFormatLite::WriteEnum(
v1::agent::Response::kTypeFieldNumber,
v1::agent::Response::GET_STATE,
&writer);
WireFormatLite::WriteBytes(
v1::agent::Response::kGetStateFieldNumber,
serializeGetState(approvers),
&writer);
// We must manually trim the unused buffer space since
// we use the string before the coded output stream is
// destructed.
writer.Trim();
return OK(std::move(output), stringify(contentType));
}
case ContentType::JSON: {
string body = jsonify([&](JSON::ObjectWriter* writer) {
const google::protobuf::Descriptor* descriptor =
v1::agent::Response::descriptor();
int field;
field = v1::agent::Response::kTypeFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
v1::agent::Response::Type_Name(
v1::agent::Response::GET_STATE));
field = v1::agent::Response::kGetStateFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
jsonifyGetState(approvers));
});
// TODO(bmahler): Pass jsonp query parameter through here.
return OK(std::move(body), stringify(contentType));
}
default:
return NotAcceptable("Request must accept json or protobuf");
}
}));
}
function<void(JSON::ObjectWriter*)> Http::jsonifyGetState(
const Owned<ObjectApprovers>& approvers) const
{
// Serialize the following message:
//
// v1::agent::Response::GetState getState;
// *getState.mutable_get_tasks() = ...;
// *getState.mutable_get_executors() = ...;
// *getState.mutable_get_frameworks() = ...;
// TODO(bmahler): This copies the Owned object approvers.
return [=](JSON::ObjectWriter* writer) {
const google::protobuf::Descriptor* descriptor =
v1::agent::Response::GetState::descriptor();
int field;
field = v1::agent::Response::GetState::kGetTasksFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
jsonifyGetTasks(approvers));
field = v1::agent::Response::GetState::kGetExecutorsFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
jsonifyGetExecutors(approvers));
field = v1::agent::Response::GetState::kGetFrameworksFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
jsonifyGetFrameworks(approvers));
};
}
string Http::serializeGetState(
const Owned<ObjectApprovers>& approvers) const
{
// Serialize the following message:
//
// v1::agent::Response::GetState getState;
// *getState.mutable_get_tasks() = ...;
// *getState.mutable_get_executors() = ...;
// *getState.mutable_get_frameworks() = ...;
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
WireFormatLite::WriteBytes(
v1::agent::Response::GetState::kGetTasksFieldNumber,
serializeGetTasks(approvers),
&writer);
WireFormatLite::WriteBytes(
v1::agent::Response::GetState::kGetExecutorsFieldNumber,
serializeGetExecutors(approvers),
&writer);
WireFormatLite::WriteBytes(
v1::agent::Response::GetState::kGetFrameworksFieldNumber,
serializeGetFrameworks(approvers),
&writer);
// While an explicit Trim() isn't necessary (since the coded
// output stream is destructed before the string is returned),
// it's a quite tricky bug to diagnose if Trim() is missed, so
// we always do it explicitly to signal the reader about this
// subtlety.
writer.Trim();
return output;
}
string Http::STATISTICS_HELP()
{
return HELP(
TLDR(
"Retrieve resource monitoring information."),
DESCRIPTION(
"Returns the current resource consumption data for containers",
"running under this agent.",
"",
"Example:",
"",
"```",
"[{",
" \"executor_id\":\"executor\",",
" \"executor_name\":\"name\",",
" \"framework_id\":\"framework\",",