blob: 034419bc95f652b5c66a07e722261878f228162c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#include "master/master.hpp"
#include <string>
#include <vector>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/wire_format_lite.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#include <mesos/mesos.hpp>
#include <mesos/authorizer/authorizer.hpp>
#include <process/http.hpp>
#include <process/owned.hpp>
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/jsonify.hpp>
#include <stout/option.hpp>
#include <stout/representation.hpp>
#include "common/build.hpp"
#include "common/http.hpp"
using google::protobuf::internal::WireFormatLite;
using process::Owned;
using process::http::NotAcceptable;
using process::http::OK;
using process::http::Response;
using mesos::authorization::VIEW_EXECUTOR;
using mesos::authorization::VIEW_FLAGS;
using mesos::authorization::VIEW_FRAMEWORK;
using mesos::authorization::VIEW_ROLE;
using mesos::authorization::VIEW_TASK;
using mesos::internal::protobuf::WireFormatLite2;
using std::function;
using std::pair;
using std::string;
using std::vector;
namespace mesos {
namespace internal {
namespace master {
// Pull in model overrides from common.
using mesos::internal::model;
// The summary representation of `T` to support the `/state-summary` endpoint.
// e.g., `Summary<Slave>`.
template <typename T>
struct Summary : Representation<T>
{
using Representation<T>::Representation;
};
// The full representation of `T` to support the `/state` endpoint.
// e.g., `Full<Slave>`.
template <typename T>
struct Full : Representation<T>
{
using Representation<T>::Representation;
};
// Filtered representation of Full<Framework>.
// Executors and Tasks are filtered based on whether the
// user is authorized to view them.
//
// TODO(bevers): Consider moving writers and other json-related
// code into a separate file.
struct FullFrameworkWriter {
FullFrameworkWriter(
const process::Owned<ObjectApprovers>& approvers,
const Framework* framework);
void operator()(JSON::ObjectWriter* writer) const;
const process::Owned<ObjectApprovers>& approvers_;
const Framework* framework_;
};
struct SlaveWriter
{
SlaveWriter(
const Slave& slave,
const Option<DrainInfo>& drainInfo,
bool deactivated,
const process::Owned<ObjectApprovers>& approvers);
void operator()(JSON::ObjectWriter* writer) const;
const Slave& slave_;
const Option<DrainInfo> drainInfo_;
const bool deactivated_;
const process::Owned<ObjectApprovers>& approvers_;
};
struct SlavesWriter
{
SlavesWriter(
const Master::Slaves& slaves,
const process::Owned<ObjectApprovers>& approvers,
const IDAcceptor<SlaveID>& selectSlaveId);
void operator()(JSON::ObjectWriter* writer) const;
void writeSlave(const Slave* slave, JSON::ObjectWriter* writer) const;
const Master::Slaves& slaves_;
const process::Owned<ObjectApprovers>& approvers_;
const IDAcceptor<SlaveID>& selectSlaveId_;
};
void json(JSON::ObjectWriter* writer, const Summary<Framework>& summary);
FullFrameworkWriter::FullFrameworkWriter(
const Owned<ObjectApprovers>& approvers,
const Framework* framework)
: approvers_(approvers),
framework_(framework)
{}
void FullFrameworkWriter::operator()(JSON::ObjectWriter* writer) const
{
json(writer, Summary<Framework>(*framework_));
// Add additional fields to those generated by the
// `Summary<Framework>` overload.
writer->field("user", framework_->info.user());
writer->field("failover_timeout", framework_->info.failover_timeout());
writer->field("checkpoint", framework_->info.checkpoint());
writer->field("registered_time", framework_->registeredTime.secs());
writer->field("unregistered_time", framework_->unregisteredTime.secs());
if (framework_->info.has_principal()) {
writer->field("principal", framework_->info.principal());
}
// TODO(bmahler): Consider deprecating this in favor of the split
// used and offered resources added in `Summary<Framework>`.
writer->field(
"resources",
framework_->totalUsedResources + framework_->totalOfferedResources);
// TODO(benh): Consider making reregisteredTime an Option.
if (framework_->registeredTime != framework_->reregisteredTime) {
writer->field("reregistered_time", framework_->reregisteredTime.secs());
}
// For multi-role frameworks the `role` field will be unset.
// Note that we could set `roles` here for both cases, which
// would make tooling simpler (only need to look for `roles`).
// However, we opted to just mirror the protobuf akin to how
// generic protobuf -> JSON translation works.
if (framework_->capabilities.multiRole) {
writer->field("roles", framework_->info.roles());
} else {
writer->field("role", framework_->info.role());
}
// Model all of the tasks associated with a framework.
writer->field("tasks", [this](JSON::ArrayWriter* writer) {
foreachvalue (Task* task, framework_->tasks) {
// Skip unauthorized tasks.
if (!approvers_->approved<VIEW_TASK>(*task, framework_->info)) {
continue;
}
writer->element(*task);
}
});
writer->field("unreachable_tasks", [this](JSON::ArrayWriter* writer) {
foreachvalue (const Owned<Task>& task, framework_->unreachableTasks) {
// Skip unauthorized tasks.
if (!approvers_->approved<VIEW_TASK>(*task, framework_->info)) {
continue;
}
writer->element(*task);
}
});
writer->field("completed_tasks", [this](JSON::ArrayWriter* writer) {
foreach (const Owned<Task>& task, framework_->completedTasks) {
// Skip unauthorized tasks.
if (!approvers_->approved<VIEW_TASK>(*task, framework_->info)) {
continue;
}
writer->element(*task);
}
});
// Model all of the offers associated with a framework.
writer->field("offers", [this](JSON::ArrayWriter* writer) {
foreach (Offer* offer, framework_->offers) {
writer->element(*offer);
}
});
// Model all of the executors of a framework.
writer->field("executors", [this](JSON::ArrayWriter* writer) {
foreachpair (
const SlaveID& slaveId,
const auto& executorsMap,
framework_->executors) {
foreachvalue (const ExecutorInfo& executor, executorsMap) {
writer->element([this,
&executor,
&slaveId](JSON::ObjectWriter* writer) {
// Skip unauthorized executors.
if (!approvers_->approved<VIEW_EXECUTOR>(
executor, framework_->info)) {
return;
}
json(writer, executor);
writer->field("slave_id", slaveId.value());
});
}
}
});
// Model all of the labels associated with a framework.
if (framework_->info.has_labels()) {
writer->field("labels", framework_->info.labels());
}
writer->field(
"offer_constraints", JSON::Protobuf(framework_->offerConstraints()));
}
SlaveWriter::SlaveWriter(
const Slave& slave,
const Option<DrainInfo>& drainInfo,
bool deactivated,
const Owned<ObjectApprovers>& approvers)
: slave_(slave),
drainInfo_(drainInfo),
deactivated_(deactivated),
approvers_(approvers)
{}
void SlaveWriter::operator()(JSON::ObjectWriter* writer) const
{
json(writer, slave_.info);
writer->field("pid", string(slave_.pid));
writer->field("registered_time", slave_.registeredTime.secs());
if (slave_.reregisteredTime.isSome()) {
writer->field("reregistered_time", slave_.reregisteredTime->secs());
}
const Resources& totalResources = slave_.totalResources;
writer->field("resources", totalResources);
writer->field("used_resources", Resources::sum(slave_.usedResources));
writer->field("offered_resources", slave_.offeredResources);
writer->field(
"reserved_resources",
[&totalResources, this](JSON::ObjectWriter* writer) {
foreachpair (const string& role, const Resources& reservation,
totalResources.reservations()) {
// TODO(arojas): Consider showing unapproved resources in an
// aggregated special field, so that all resource values add up
// MESOS-7779.
if (approvers_->approved<VIEW_ROLE>(role)) {
writer->field(role, reservation);
}
}
});
writer->field("unreserved_resources", totalResources.unreserved());
writer->field("active", slave_.active);
writer->field("deactivated", deactivated_);
writer->field("version", slave_.version);
writer->field("capabilities", slave_.capabilities.toRepeatedPtrField());
if (drainInfo_.isSome()) {
writer->field("drain_info", JSON::Protobuf(drainInfo_.get()));
if (slave_.estimatedDrainStartTime.isSome()) {
writer->field(
"estimated_drain_start_time_seconds",
slave_.estimatedDrainStartTime->secs());
}
}
}
SlavesWriter::SlavesWriter(
const Master::Slaves& slaves,
const Owned<ObjectApprovers>& approvers,
const IDAcceptor<SlaveID>& selectSlaveId)
: slaves_(slaves), approvers_(approvers), selectSlaveId_(selectSlaveId)
{}
void SlavesWriter::operator()(JSON::ObjectWriter* writer) const
{
writer->field("slaves", [this](JSON::ArrayWriter* writer) {
foreachvalue (const Slave* slave, slaves_.registered) {
if (!selectSlaveId_.accept(slave->id)) {
continue;
}
writer->element([this, &slave](JSON::ObjectWriter* writer) {
writeSlave(slave, writer);
});
}
});
writer->field("recovered_slaves", [this](JSON::ArrayWriter* writer) {
foreachvalue (const SlaveInfo& slaveInfo, slaves_.recovered) {
if (!selectSlaveId_.accept(slaveInfo.id())) {
continue;
}
writer->element([&slaveInfo](JSON::ObjectWriter* writer) {
json(writer, slaveInfo);
});
}
});
}
void SlavesWriter::writeSlave(
const Slave* slave, JSON::ObjectWriter* writer) const
{
SlaveWriter(
*slave,
slaves_.draining.get(slave->id),
slaves_.deactivated.contains(slave->id),
approvers_)(writer);
// Add the complete protobuf->JSON for all used, reserved,
// and offered resources. The other endpoints summarize
// resource information, which omits the details of
// reservations and persistent volumes. Full resource
// information is necessary so that operators can use the
// `/unreserve` and `/destroy-volumes` endpoints.
hashmap<string, Resources> reserved = slave->totalResources.reservations();
writer->field(
"reserved_resources_full",
[&reserved, this](JSON::ObjectWriter* writer) {
foreachpair (const string& role,
const Resources& resources,
reserved) {
if (approvers_->approved<VIEW_ROLE>(role)) {
writer->field(role, [&resources, this](
JSON::ArrayWriter* writer) {
foreach (Resource resource, resources) {
if (approvers_->approved<VIEW_ROLE>(resource)) {
convertResourceFormat(&resource, ENDPOINT);
writer->element(JSON::Protobuf(resource));
}
}
});
}
}
});
Resources unreservedResources = slave->totalResources.unreserved();
writer->field(
"unreserved_resources_full",
[&unreservedResources, this](JSON::ArrayWriter* writer) {
foreach (Resource resource, unreservedResources) {
if (approvers_->approved<VIEW_ROLE>(resource)) {
convertResourceFormat(&resource, ENDPOINT);
writer->element(JSON::Protobuf(resource));
}
}
});
Resources usedResources = Resources::sum(slave->usedResources);
writer->field(
"used_resources_full",
[&usedResources, this](JSON::ArrayWriter* writer) {
foreach (Resource resource, usedResources) {
if (approvers_->approved<VIEW_ROLE>(resource)) {
convertResourceFormat(&resource, ENDPOINT);
writer->element(JSON::Protobuf(resource));
}
}
});
const Resources& offeredResources = slave->offeredResources;
writer->field(
"offered_resources_full",
[&offeredResources, this](JSON::ArrayWriter* writer) {
foreach (Resource resource, offeredResources) {
if (approvers_->approved<VIEW_ROLE>(resource)) {
convertResourceFormat(&resource, ENDPOINT);
writer->element(JSON::Protobuf(resource));
}
}
});
}
void json(JSON::ObjectWriter* writer, const Summary<Framework>& summary)
{
const Framework& framework = summary;
writer->field("id", framework.id().value());
writer->field("name", framework.info.name());
// Omit pid for http frameworks.
if (framework.pid().isSome()) {
writer->field("pid", string(framework.pid().get()));
}
// TODO(bmahler): Use these in the webui.
writer->field("used_resources", framework.totalUsedResources);
writer->field("offered_resources", framework.totalOfferedResources);
writer->field("capabilities", framework.info.capabilities());
writer->field("hostname", framework.info.hostname());
writer->field("webui_url", framework.info.webui_url());
writer->field("active", framework.active());
writer->field("connected", framework.connected());
writer->field("recovered", framework.recovered());
}
// This abstraction has no side-effects. It factors out computing the
// mapping from 'slaves' to 'frameworks' to answer the questions 'what
// frameworks are running on a given slave?' and 'what slaves are
// running the given framework?'.
class SlaveFrameworkMapping
{
public:
SlaveFrameworkMapping(const hashmap<FrameworkID, Framework*>& frameworks)
{
foreachpair (const FrameworkID& frameworkId,
const Framework* framework,
frameworks) {
foreachvalue (const Task* task, framework->tasks) {
frameworksToSlaves[frameworkId].insert(task->slave_id());
slavesToFrameworks[task->slave_id()].insert(frameworkId);
}
foreachvalue (const Owned<Task>& task, framework->unreachableTasks) {
frameworksToSlaves[frameworkId].insert(task->slave_id());
slavesToFrameworks[task->slave_id()].insert(frameworkId);
}
foreach (const Owned<Task>& task, framework->completedTasks) {
frameworksToSlaves[frameworkId].insert(task->slave_id());
slavesToFrameworks[task->slave_id()].insert(frameworkId);
}
}
}
const hashset<FrameworkID>& frameworks(const SlaveID& slaveId) const
{
const auto iterator = slavesToFrameworks.find(slaveId);
return iterator != slavesToFrameworks.end() ?
iterator->second : hashset<FrameworkID>::EMPTY;
}
const hashset<SlaveID>& slaves(const FrameworkID& frameworkId) const
{
const auto iterator = frameworksToSlaves.find(frameworkId);
return iterator != frameworksToSlaves.end() ?
iterator->second : hashset<SlaveID>::EMPTY;
}
private:
hashmap<SlaveID, hashset<FrameworkID>> slavesToFrameworks;
hashmap<FrameworkID, hashset<SlaveID>> frameworksToSlaves;
};
// This abstraction has no side-effects. It factors out the accounting
// for a 'TaskState' summary. We use this to summarize 'TaskState's
// for both frameworks as well as slaves.
struct TaskStateSummary
{
// TODO(jmlvanre): Possibly clean this up as per MESOS-2694.
const static TaskStateSummary EMPTY;
TaskStateSummary()
: staging(0),
starting(0),
running(0),
killing(0),
finished(0),
killed(0),
failed(0),
lost(0),
error(0),
dropped(0),
unreachable(0),
gone(0),
gone_by_operator(0),
unknown(0) {}
// Account for the state of the given task.
void count(const Task& task)
{
switch (task.state()) {
case TASK_STAGING: { ++staging; break; }
case TASK_STARTING: { ++starting; break; }
case TASK_RUNNING: { ++running; break; }
case TASK_KILLING: { ++killing; break; }
case TASK_FINISHED: { ++finished; break; }
case TASK_KILLED: { ++killed; break; }
case TASK_FAILED: { ++failed; break; }
case TASK_LOST: { ++lost; break; }
case TASK_ERROR: { ++error; break; }
case TASK_DROPPED: { ++dropped; break; }
case TASK_UNREACHABLE: { ++unreachable; break; }
case TASK_GONE: { ++gone; break; }
case TASK_GONE_BY_OPERATOR: { ++gone_by_operator; break; }
case TASK_UNKNOWN: { ++unknown; break; }
// No default case allows for a helpful compiler error if we
// introduce a new state.
}
}
size_t staging;
size_t starting;
size_t running;
size_t killing;
size_t finished;
size_t killed;
size_t failed;
size_t lost;
size_t error;
size_t dropped;
size_t unreachable;
size_t gone;
size_t gone_by_operator;
size_t unknown;
};
const TaskStateSummary TaskStateSummary::EMPTY;
// This abstraction has no side-effects. It factors out computing the
// 'TaskState' summaries for frameworks and slaves. This answers the
// questions 'How many tasks are in each state for a given framework?'
// and 'How many tasks are in each state for a given slave?'.
class TaskStateSummaries
{
public:
TaskStateSummaries(const hashmap<FrameworkID, Framework*>& frameworks)
{
foreachpair (const FrameworkID& frameworkId,
const Framework* framework,
frameworks) {
foreachvalue (const Task* task, framework->tasks) {
frameworkTaskSummaries[frameworkId].count(*task);
slaveTaskSummaries[task->slave_id()].count(*task);
}
foreachvalue (const Owned<Task>& task, framework->unreachableTasks) {
frameworkTaskSummaries[frameworkId].count(*task);
slaveTaskSummaries[task->slave_id()].count(*task);
}
foreach (const Owned<Task>& task, framework->completedTasks) {
frameworkTaskSummaries[frameworkId].count(*task);
slaveTaskSummaries[task->slave_id()].count(*task);
}
}
}
const TaskStateSummary& framework(const FrameworkID& frameworkId) const
{
const auto iterator = frameworkTaskSummaries.find(frameworkId);
return iterator != frameworkTaskSummaries.end() ?
iterator->second : TaskStateSummary::EMPTY;
}
const TaskStateSummary& slave(const SlaveID& slaveId) const
{
const auto iterator = slaveTaskSummaries.find(slaveId);
return iterator != slaveTaskSummaries.end() ?
iterator->second : TaskStateSummary::EMPTY;
}
private:
hashmap<FrameworkID, TaskStateSummary> frameworkTaskSummaries;
hashmap<SlaveID, TaskStateSummary> slaveTaskSummaries;
};
pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
Master::ReadOnlyHandler::frameworks(
ContentType outputContentType,
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
CHECK_EQ(outputContentType, ContentType::JSON);
IDAcceptor<FrameworkID> selectFrameworkId(
query.get("framework_id"));
// This lambda is consumed before the outer lambda
// returns, hence capture by reference is fine here.
const Master* master = this->master;
auto frameworks = [master, &approvers, &selectFrameworkId](
JSON::ObjectWriter* writer) {
// Model all of the frameworks.
writer->field(
"frameworks",
[master, &approvers, &selectFrameworkId](
JSON::ArrayWriter* writer) {
foreachvalue (
Framework* framework, master->frameworks.registered) {
// Skip unauthorized frameworks or frameworks
// without a matching ID.
if (!selectFrameworkId.accept(framework->id()) ||
!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
continue;
}
writer->element(FullFrameworkWriter(approvers, framework));
}
});
// Model all of the completed frameworks.
writer->field(
"completed_frameworks",
[master, &approvers, &selectFrameworkId](
JSON::ArrayWriter* writer) {
foreachvalue (const Owned<Framework>& framework,
master->frameworks.completed) {
// Skip unauthorized frameworks or frameworks
// without a matching ID.
if (!selectFrameworkId.accept(framework->id()) ||
!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
continue;
}
writer->element(
FullFrameworkWriter(approvers, framework.get()));
}
});
// Unregistered frameworks are no longer possible. We emit an
// empty array for the sake of backward compatibility.
writer->field("unregistered_frameworks", [](JSON::ArrayWriter*) {});
};
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
OK(jsonify(frameworks), query.get("jsonp")),
None());
}
pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
Master::ReadOnlyHandler::roles(
ContentType outputContentType,
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
CHECK_EQ(outputContentType, ContentType::JSON);
const Master* master = this->master;
const vector<string> knownRoles = master->knownRoles();
auto roles = [&](JSON::ObjectWriter* writer) {
writer->field(
"roles",
[&](JSON::ArrayWriter* writer) {
foreach (const string& name, knownRoles) {
if (!approvers->approved<VIEW_ROLE>(name)) {
continue;
}
writer->element([&](JSON::ObjectWriter* writer) {
writer->field("name", name);
writer->field(
"weight",
master->weights.get(name).getOrElse(DEFAULT_WEIGHT));
Option<Role*> role = master->roles.get(name);
RoleResourceBreakdown resourceBreakdown(master, name);
// Prior to Mesos 1.9, this field is filled based on
// `QuotaInfo` which is now deprecated. For backward
// compatibility reasons, we do not use any formatter
// for the new struct but construct the response by hand.
// Specifically:
//
// - We keep the `role` field which was present in the
// `QuotaInfo`.
//
// - We name the field using singular `guarantee` and `limit`
// which is different from the plural used in `QuotaConfig`.
const Quota quota = master->quotas.get(name).getOrElse(Quota());
writer->field("quota", [&](JSON::ObjectWriter* writer) {
writer->field("role", name);
writer->field("guarantee", quota.guarantees);
writer->field("limit", quota.limits);
writer->field("consumed", resourceBreakdown.consumedQuota());
});
ResourceQuantities allocated = resourceBreakdown.allocated();
ResourceQuantities offered = resourceBreakdown.offered();
// Deprecated by allocated, offered, reserved.
writer->field("resources", allocated + offered);
writer->field("allocated", allocated);
writer->field("offered", offered);
writer->field("reserved", resourceBreakdown.reserved());
if (role.isNone()) {
writer->field("frameworks", [](JSON::ArrayWriter*) {});
} else {
writer->field("frameworks", [&](JSON::ArrayWriter* writer) {
foreachkey (const FrameworkID& id, (*role)->frameworks) {
writer->element(id.value());
}
});
}
});
}
});
};
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
OK(jsonify(roles), query.get("jsonp")),
None());
}
pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
Master::ReadOnlyHandler::slaves(
ContentType outputContentType,
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
CHECK_EQ(outputContentType, ContentType::JSON);
IDAcceptor<SlaveID> selectSlaveId(query.get("slave_id"));
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
OK(jsonify(SlavesWriter(master->slaves, approvers, selectSlaveId)),
query.get("jsonp")),
None());
}
pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
Master::ReadOnlyHandler::state(
ContentType outputContentType,
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
CHECK_EQ(outputContentType, ContentType::JSON);
const Master* master = this->master;
auto calculateState = [master, &approvers](JSON::ObjectWriter* writer) {
writer->field("version", MESOS_VERSION);
if (build::GIT_SHA.isSome()) {
writer->field("git_sha", build::GIT_SHA.get());
}
if (build::GIT_BRANCH.isSome()) {
writer->field("git_branch", build::GIT_BRANCH.get());
}
if (build::GIT_TAG.isSome()) {
writer->field("git_tag", build::GIT_TAG.get());
}
writer->field("build_date", build::DATE);
writer->field("build_time", build::TIME);
writer->field("build_user", build::USER);
writer->field("start_time", master->startTime.secs());
if (master->electedTime.isSome()) {
writer->field("elected_time", master->electedTime->secs());
}
writer->field("id", master->info().id());
writer->field("pid", string(master->self()));
writer->field("hostname", master->info().hostname());
writer->field("capabilities", master->info().capabilities());
writer->field("activated_slaves", master->_const_slaves_active());
writer->field("deactivated_slaves", master->_const_slaves_inactive());
writer->field("unreachable_slaves", master->_const_slaves_unreachable());
if (master->info().has_domain()) {
writer->field("domain", master->info().domain());
}
// TODO(haosdent): Deprecated this in favor of `leader_info` below.
if (master->leader.isSome()) {
writer->field("leader", master->leader->pid());
}
if (master->leader.isSome()) {
writer->field("leader_info", [master](JSON::ObjectWriter* writer) {
json(writer, master->leader.get());
});
}
if (approvers->approved<VIEW_FLAGS>()) {
if (master->flags.cluster.isSome()) {
writer->field("cluster", master->flags.cluster.get());
}
if (master->flags.log_dir.isSome()) {
writer->field("log_dir", master->flags.log_dir.get());
}
if (master->flags.external_log_file.isSome()) {
writer->field("external_log_file",
master->flags.external_log_file.get());
}
writer->field("flags", [master](JSON::ObjectWriter* writer) {
foreachvalue (const flags::Flag& flag, master->flags) {
Option<string> value = flag.stringify(master->flags);
if (value.isSome()) {
writer->field(flag.effective_name().value, value.get());
}
}
});
}
// Model all of the registered slaves.
writer->field(
"slaves",
[master, &approvers](JSON::ArrayWriter* writer) {
foreachvalue (Slave* slave, master->slaves.registered) {
writer->element(SlaveWriter(
*slave,
master->slaves.draining.get(slave->id),
master->slaves.deactivated.contains(slave->id),
approvers));
}
});
// Model all of the recovered slaves.
writer->field(
"recovered_slaves",
[master](JSON::ArrayWriter* writer) {
foreachvalue (
const SlaveInfo& slaveInfo, master->slaves.recovered) {
writer->element([&slaveInfo](JSON::ObjectWriter* writer) {
json(writer, slaveInfo);
});
}
});
// Model all of the frameworks.
writer->field(
"frameworks",
[master, &approvers](JSON::ArrayWriter* writer) {
foreachvalue (
Framework* framework, master->frameworks.registered) {
// Skip unauthorized frameworks.
if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
continue;
}
writer->element(FullFrameworkWriter(approvers, framework));
}
});
// Model all of the completed frameworks.
writer->field(
"completed_frameworks",
[master, &approvers](JSON::ArrayWriter* writer) {
foreachvalue (
const Owned<Framework>& framework,
master->frameworks.completed) {
// Skip unauthorized frameworks.
if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
continue;
}
writer->element(
FullFrameworkWriter(approvers, framework.get()));
}
});
// Orphan tasks are no longer possible. We emit an empty array
// for the sake of backward compatibility.
writer->field("orphan_tasks", [](JSON::ArrayWriter*) {});
// Unregistered frameworks are no longer possible. We emit an
// empty array for the sake of backward compatibility.
writer->field("unregistered_frameworks", [](JSON::ArrayWriter*) {});
};
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
OK(jsonify(calculateState), query.get("jsonp")),
None());
}
pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
Master::ReadOnlyHandler::stateSummary(
ContentType outputContentType,
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
CHECK_EQ(outputContentType, ContentType::JSON);
const Master* master = this->master;
auto stateSummary = [master, &approvers](JSON::ObjectWriter* writer) {
writer->field("hostname", master->info().hostname());
if (master->flags.cluster.isSome()) {
writer->field("cluster", master->flags.cluster.get());
}
// We use the tasks in the 'Frameworks' struct to compute summaries
// for this endpoint. This is done 1) for consistency between the
// 'slaves' and 'frameworks' subsections below 2) because we want to
// provide summary information for frameworks that are currently
// registered 3) the frameworks keep a circular buffer of completed
// tasks that we can use to keep a limited view on the history of
// recent completed / failed tasks.
// Generate mappings from 'slave' to 'framework' and reverse.
SlaveFrameworkMapping slaveFrameworkMapping(
master->frameworks.registered);
// Generate 'TaskState' summaries for all framework and slave ids.
TaskStateSummaries taskStateSummaries(
master->frameworks.registered);
// Model all of the slaves.
writer->field(
"slaves",
[master,
&slaveFrameworkMapping,
&taskStateSummaries,
&approvers](JSON::ArrayWriter* writer) {
foreachvalue (Slave* slave, master->slaves.registered) {
writer->element(
[&slave,
&master,
&slaveFrameworkMapping,
&taskStateSummaries,
&approvers](JSON::ObjectWriter* writer) {
SlaveWriter slaveWriter(
*slave,
master->slaves.draining.get(slave->id),
master->slaves.deactivated.contains(slave->id),
approvers);
slaveWriter(writer);
// Add the 'TaskState' summary for this slave.
const TaskStateSummary& summary =
taskStateSummaries.slave(slave->id);
// Certain per-agent status totals will always be zero
// (e.g., TASK_ERROR, TASK_UNREACHABLE). We report
// them here anyway, for completeness.
//
// TODO(neilc): Update for TASK_GONE and
// TASK_GONE_BY_OPERATOR.
writer->field("TASK_STAGING", summary.staging);
writer->field("TASK_STARTING", summary.starting);
writer->field("TASK_RUNNING", summary.running);
writer->field("TASK_KILLING", summary.killing);
writer->field("TASK_FINISHED", summary.finished);
writer->field("TASK_KILLED", summary.killed);
writer->field("TASK_FAILED", summary.failed);
writer->field("TASK_LOST", summary.lost);
writer->field("TASK_ERROR", summary.error);
writer->field(
"TASK_UNREACHABLE",
summary.unreachable);
// Add the ids of all the frameworks running on this
// slave.
const hashset<FrameworkID>& frameworks =
slaveFrameworkMapping.frameworks(slave->id);
writer->field(
"framework_ids",
[&frameworks](JSON::ArrayWriter* writer) {
foreach (
const FrameworkID& frameworkId,
frameworks) {
writer->element(frameworkId.value());
}
});
});
}
});
// Model all of the frameworks.
writer->field(
"frameworks",
[master,
&slaveFrameworkMapping,
&taskStateSummaries,
&approvers](JSON::ArrayWriter* writer) {
foreachpair (const FrameworkID& frameworkId,
Framework* framework,
master->frameworks.registered) {
// Skip unauthorized frameworks.
if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
continue;
}
writer->element(
[&frameworkId,
&framework,
&slaveFrameworkMapping,
&taskStateSummaries](JSON::ObjectWriter* writer) {
json(writer, Summary<Framework>(*framework));
// Add the 'TaskState' summary for this framework.
const TaskStateSummary& summary =
taskStateSummaries.framework(frameworkId);
// TODO(neilc): Update for TASK_GONE and
// TASK_GONE_BY_OPERATOR.
writer->field("TASK_STAGING", summary.staging);
writer->field("TASK_STARTING", summary.starting);
writer->field("TASK_RUNNING", summary.running);
writer->field("TASK_KILLING", summary.killing);
writer->field("TASK_FINISHED", summary.finished);
writer->field("TASK_KILLED", summary.killed);
writer->field("TASK_FAILED", summary.failed);
writer->field("TASK_LOST", summary.lost);
writer->field("TASK_ERROR", summary.error);
writer->field(
"TASK_UNREACHABLE",
summary.unreachable);
// Add the ids of all the slaves running
// this framework.
const hashset<SlaveID>& slaves =
slaveFrameworkMapping.slaves(frameworkId);
writer->field(
"slave_ids",
[&slaves](JSON::ArrayWriter* writer) {
foreach (const SlaveID& slaveId, slaves) {
writer->element(slaveId.value());
}
});
});
}
});
};
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
OK(jsonify(stateSummary), query.get("jsonp")),
None());
}
struct TaskComparator
{
static bool ascending(const Task* lhs, const Task* rhs)
{
size_t lhsSize = lhs->statuses().size();
size_t rhsSize = rhs->statuses().size();
if ((lhsSize == 0) && (rhsSize == 0)) {
return false;
}
if (lhsSize == 0) {
return true;
}
if (rhsSize == 0) {
return false;
}
return (lhs->statuses(0).timestamp() < rhs->statuses(0).timestamp());
}
static bool descending(const Task* lhs, const Task* rhs)
{
size_t lhsSize = lhs->statuses().size();
size_t rhsSize = rhs->statuses().size();
if ((lhsSize == 0) && (rhsSize == 0)) {
return false;
}
if (rhsSize == 0) {
return true;
}
if (lhsSize == 0) {
return false;
}
return (lhs->statuses(0).timestamp() > rhs->statuses(0).timestamp());
}
};
pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
Master::ReadOnlyHandler::tasks(
ContentType outputContentType,
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
CHECK_EQ(outputContentType, ContentType::JSON);
// Get list options (limit and offset).
Result<int> result = numify<int>(query.get("limit"));
size_t limit = result.isSome() ? result.get() : TASK_LIMIT;
result = numify<int>(query.get("offset"));
size_t offset = result.isSome() ? result.get() : 0;
Option<string> order = query.get("order");
string _order = order.isSome() && (order.get() == "asc") ? "asc" : "des";
Option<string> frameworkId = query.get("framework_id");
Option<string> taskId = query.get("task_id");
IDAcceptor<FrameworkID> selectFrameworkId(frameworkId);
IDAcceptor<TaskID> selectTaskId(taskId);
// Construct framework list with both active and completed frameworks.
vector<const Framework*> frameworks;
foreachvalue (const Framework* framework, master->frameworks.registered) {
// Skip unauthorized frameworks or frameworks without matching
// framework ID.
if (!selectFrameworkId.accept(framework->id()) ||
!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
continue;
}
frameworks.push_back(framework);
}
foreachvalue (const Owned<Framework>& framework,
master->frameworks.completed) {
// Skip unauthorized frameworks or frameworks without matching
// framework ID.
if (!selectFrameworkId.accept(framework->id()) ||
!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
continue;
}
frameworks.push_back(framework.get());
}
// Construct task list with both running,
// completed and unreachable tasks.
vector<const Task*> tasks;
foreach (const Framework* framework, frameworks) {
foreachvalue (Task* task, framework->tasks) {
// Skip unauthorized tasks or tasks without matching task ID.
if (!selectTaskId.accept(task->task_id()) ||
!approvers->approved<VIEW_TASK>(*task, framework->info)) {
continue;
}
tasks.push_back(task);
}
foreachvalue (
const Owned<Task>& task,
framework->unreachableTasks) {
// Skip unauthorized tasks or tasks without matching task ID.
if (!selectTaskId.accept(task->task_id()) ||
!approvers->approved<VIEW_TASK>(*task, framework->info)) {
continue;
}
tasks.push_back(task.get());
}
foreach (const Owned<Task>& task, framework->completedTasks) {
// Skip unauthorized tasks or tasks without matching task ID.
if (!selectTaskId.accept(task->task_id()) ||
!approvers->approved<VIEW_TASK>(*task, framework->info)) {
continue;
}
tasks.push_back(task.get());
}
}
// Sort tasks by task status timestamp. Default order is descending.
// The earliest timestamp is chosen for comparison when
// multiple are present.
if (_order == "asc") {
sort(tasks.begin(), tasks.end(), TaskComparator::ascending);
} else {
sort(tasks.begin(), tasks.end(), TaskComparator::descending);
}
auto tasksWriter =
[&tasks, limit, offset](JSON::ObjectWriter* writer) {
writer->field(
"tasks",
[&tasks, limit, offset](JSON::ArrayWriter* writer) {
// Collect 'limit' number of tasks starting from 'offset'.
size_t end = std::min(offset + limit, tasks.size());
for (size_t i = offset; i < end; i++) {
writer->element(*tasks[i]);
}
});
};
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
OK(jsonify(tasksWriter), query.get("jsonp")),
None());
}
function<void(JSON::ObjectWriter*)> Master::ReadOnlyHandler::jsonifyGetAgents(
const Owned<ObjectApprovers>& approvers) const
{
// Serialize the following:
//
// mesos::master::Response::GetAgents getAgents;
// for each registered agent:
// *getAgents.add_agents() = protobuf::master::event::createAgentResponse(
// agent,
// master->slaves.draining.get(slave->id),
// master->slaves.deactivated.contains(slave->id),
// approvers);
// for each recovered agent:
// SlaveInfo* agent = getAgents.add_recovered_agents();
// agent->CopyFrom(slaveInfo);
// agent->clear_resources();
// foreach (const Resource& resource, slaveInfo.resources()):
// if (approvers->approved<VIEW_ROLE>(resource)):
// *agent->add_resources() = resource;
// TODO(bmahler): This copies the Owned object approvers.
return [=](JSON::ObjectWriter* writer) {
const google::protobuf::Descriptor* descriptor =
v1::master::Response::GetAgents::descriptor();
int field;
field = v1::master::Response::GetAgents::kAgentsFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
[&](JSON::ArrayWriter* writer) {
foreachvalue (const Slave* slave, master->slaves.registered) {
// TODO(bmahler): Consider not constructing the temporary
// agent object and instead serialize directly.
mesos::master::Response::GetAgents::Agent agent =
protobuf::master::event::createAgentResponse(
*slave,
master->slaves.draining.get(slave->id),
master->slaves.deactivated.contains(slave->id),
approvers);
writer->element(asV1Protobuf(agent));
}
});
field = v1::master::Response::GetAgents::kRecoveredAgentsFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
[&](JSON::ArrayWriter* writer) {
foreachvalue (const SlaveInfo& slaveInfo, master->slaves.recovered) {
// TODO(bmahler): Consider not constructing the temporary
// SlaveInfo object and instead serialize directly.
SlaveInfo agent = slaveInfo;
agent.clear_resources();
foreach (const Resource& resource, slaveInfo.resources()) {
if (approvers->approved<VIEW_ROLE>(resource)) {
*agent.add_resources() = resource;
}
}
writer->element(asV1Protobuf(agent));
}
});
};
}
string Master::ReadOnlyHandler::serializeGetAgents(
const Owned<ObjectApprovers>& approvers) const
{
// Serialize the following:
//
// mesos::master::Response::GetAgents getAgents;
// for each registered agent:
// *getAgents.add_agents() = protobuf::master::event::createAgentResponse(
// agent,
// master->slaves.draining.get(slave->id),
// master->slaves.deactivated.contains(slave->id),
// approvers);
// for each recovered agent:
// SlaveInfo* agent = getAgents.add_recovered_agents();
// agent->CopyFrom(slaveInfo);
// agent->clear_resources();
// foreach (const Resource& resource, slaveInfo.resources()):
// if (approvers->approved<VIEW_ROLE>(resource)):
// *agent->add_resources() = resource;
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
foreachvalue (const Slave* slave, master->slaves.registered) {
// TODO(bmahler): Consider not constructing the temporary
// agent object and instead serialize directly.
WireFormatLite2::WriteMessageWithoutCachedSizes(
mesos::master::Response::GetAgents::kAgentsFieldNumber,
protobuf::master::event::createAgentResponse(
*slave,
master->slaves.draining.get(slave->id),
master->slaves.deactivated.contains(slave->id),
approvers),
&writer);
}
foreachvalue (const SlaveInfo& slaveInfo, master->slaves.recovered) {
// TODO(bmahler): Consider not constructing the temporary
// SlaveInfo object and instead serialize directly.
SlaveInfo agent = slaveInfo;
agent.clear_resources();
foreach (const Resource& resource, slaveInfo.resources()) {
if (approvers->approved<VIEW_ROLE>(resource)) {
*agent.add_resources() = resource;
}
}
WireFormatLite2::WriteMessageWithoutCachedSizes(
mesos::master::Response::GetAgents::kRecoveredAgentsFieldNumber,
agent,
&writer);
}
// While an explicit Trim() isn't necessary (since the coded
// output stream is destructed before the string is returned),
// it's a quite tricky bug to diagnose if Trim() is missed, so
// we always do it explicitly to signal the reader about this
// subtlety.
writer.Trim();
return output;
}
pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
Master::ReadOnlyHandler::getAgents(
ContentType outputContentType,
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
// Serialize the following message:
//
// mesos::master::Response response;
// response.set_type(mesos::master::Response::GET_AGENTS);
// *response.mutable_get_agents() = _getAgents(approvers);
switch (outputContentType) {
case ContentType::PROTOBUF: {
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
WireFormatLite::WriteEnum(
mesos::v1::master::Response::kTypeFieldNumber,
mesos::v1::master::Response::GET_AGENTS,
&writer);
WireFormatLite::WriteBytes(
mesos::v1::master::Response::kGetAgentsFieldNumber,
serializeGetAgents(approvers),
&writer);
// We must manually trim the unused buffer space since
// we use the string before the coded output stream is
// destructed.
writer.Trim();
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
OK(std::move(output), stringify(outputContentType)),
None());
}
case ContentType::JSON: {
string body = jsonify([&](JSON::ObjectWriter* writer) {
const google::protobuf::Descriptor* descriptor =
v1::master::Response::descriptor();
int field;
field = v1::master::Response::kTypeFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
v1::master::Response::Type_Name(
v1::master::Response::GET_AGENTS));
field = v1::master::Response::kGetAgentsFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
jsonifyGetAgents(approvers));
});
// TODO(bmahler): Pass jsonp query parameter through here.
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
OK(std::move(body), stringify(outputContentType)),
None());
}
default:
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
NotAcceptable("Request must accept json or protobuf"), None());
}
}
function<void(JSON::ObjectWriter*)>
Master::ReadOnlyHandler::jsonifyGetFrameworks(
const Owned<ObjectApprovers>& approvers) const
{
// Serialize the following:
//
// mesos::master::Response::GetFrameworks getFrameworks;
// for each framework:
// *getFrameworks.add_frameworks() = model(*framework);
// for each completed framework:
// *getFrameworks.add_completed_frameworks() = model(*framework);
// TODO(bmahler): Consider not constructing the temporary framework
// objects and instead serialize directly, but since we don't
// expect a large number of pending tasks, we currently don't
// bother with the more efficient approach.
// TODO(bmahler): This copies the Owned object approvers.
return [=](JSON::ObjectWriter* writer) {
const google::protobuf::Descriptor* descriptor =
v1::master::Response::GetFrameworks::descriptor();
int field;
field = v1::master::Response::GetFrameworks::kFrameworksFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
[&](JSON::ArrayWriter* writer) {
foreachvalue (const Framework* framework,
master->frameworks.registered) {
// Skip unauthorized frameworks.
if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
continue;
}
mesos::master::Response::GetFrameworks::Framework f = model(*framework);
writer->element(asV1Protobuf(f));
}
});
field =
v1::master::Response::GetFrameworks::kCompletedFrameworksFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
[&](JSON::ArrayWriter* writer) {
foreachvalue (const Owned<Framework>& framework,
master->frameworks.completed) {
// Skip unauthorized frameworks.
if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
continue;
}
mesos::master::Response::GetFrameworks::Framework f = model(*framework);
writer->element(asV1Protobuf(f));
}
});
};
}
string Master::ReadOnlyHandler::serializeGetFrameworks(
const Owned<ObjectApprovers>& approvers) const
{
// Serialize the following:
//
// mesos::master::Response::GetFrameworks getFrameworks;
// for each framework:
// *getFrameworks.add_frameworks() = model(*framework);
// for each completed framework:
// *getFrameworks.add_completed_frameworks() = model(*framework);
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
// TODO(bmahler): Consider not constructing the temporary framework
// objects and instead serialize directly, but since we don't
// expect a large number of pending tasks, we currently don't
// bother with the more efficient approach.
foreachvalue (const Framework* framework,
master->frameworks.registered) {
// Skip unauthorized frameworks.
if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
continue;
}
WireFormatLite2::WriteMessageWithoutCachedSizes(
mesos::master::Response::GetFrameworks::kFrameworksFieldNumber,
model(*framework),
&writer);
}
foreachvalue (const Owned<Framework>& framework,
master->frameworks.completed) {
// Skip unauthorized frameworks.
if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
continue;
}
WireFormatLite2::WriteMessageWithoutCachedSizes(
mesos::master::Response::GetFrameworks::kCompletedFrameworksFieldNumber,
model(*framework),
&writer);
}
// While an explicit Trim() isn't necessary (since the coded
// output stream is destructed before the string is returned),
// it's a quite tricky bug to diagnose if Trim() is missed, so
// we always do it explicitly to signal the reader about this
// subtlety.
writer.Trim();
return output;
}
pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
Master::ReadOnlyHandler::getFrameworks(
ContentType outputContentType,
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
// Serialize the following message:
//
// mesos::master::Response response;
// response.set_type(mesos::master::Response::GET_FRAMEWORKS);
// *response.mutable_get_frameworks() = _getFrameworks(approvers);
switch (outputContentType) {
case ContentType::PROTOBUF: {
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
WireFormatLite::WriteEnum(
mesos::v1::master::Response::kTypeFieldNumber,
mesos::v1::master::Response::GET_FRAMEWORKS,
&writer);
WireFormatLite::WriteBytes(
mesos::v1::master::Response::kGetFrameworksFieldNumber,
serializeGetFrameworks(approvers),
&writer);
// We must manually trim the unused buffer space since
// we use the string before the coded output stream is
// destructed.
writer.Trim();
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
OK(std::move(output), stringify(outputContentType)),
None());
}
case ContentType::JSON: {
string body = jsonify([&](JSON::ObjectWriter* writer) {
const google::protobuf::Descriptor* descriptor =
v1::master::Response::descriptor();
int field;
field = v1::master::Response::kTypeFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
v1::master::Response::Type_Name(
v1::master::Response::GET_FRAMEWORKS));
field = v1::master::Response::kGetFrameworksFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
jsonifyGetFrameworks(approvers));
});
// TODO(bmahler): Pass jsonp query parameter through here.
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
OK(std::move(body), stringify(outputContentType)),
None());
}
default:
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
NotAcceptable("Request must accept json or protobuf"), None());
}
}
function<void(JSON::ObjectWriter*)>
Master::ReadOnlyHandler::jsonifyGetExecutors(
const Owned<ObjectApprovers>& approvers) const
{
// Serialize the following:
//
// mesos::master::Response::GetExecutors getExecutors;
//
// for each (executor, agent):
// mesos::master::Response::GetExecutors::Executor* executor =
// getExecutors.add_executors();
// *executor->mutable_executor_info() = executorInfo;
// *executor->mutable_slave_id() = slaveId;
// TODO(bmahler): This copies the owned object approvers.
return [=](JSON::ObjectWriter* writer) {
// Construct framework list with both active and completed frameworks.
vector<const Framework*> frameworks;
foreachvalue (const Framework* framework, master->frameworks.registered) {
// Skip unauthorized frameworks.
if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
frameworks.push_back(framework);
}
}
foreachvalue (const Owned<Framework>& framework,
master->frameworks.completed) {
// Skip unauthorized frameworks.
if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
frameworks.push_back(framework.get());
}
}
const google::protobuf::Descriptor* descriptor =
v1::master::Response::GetExecutors::descriptor();
int field;
field = v1::master::Response::GetExecutors::kExecutorsFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
[&](JSON::ArrayWriter* writer) {
foreach (const Framework* framework, frameworks) {
foreachpair (const SlaveID& slaveId,
const auto& executorsMap,
framework->executors) {
foreachvalue (const ExecutorInfo& executorInfo, executorsMap) {
// Skip unauthorized executors.
if (!approvers->approved<VIEW_EXECUTOR>(
executorInfo, framework->info)) {
continue;
}
writer->element([&](JSON::ObjectWriter* writer) {
const google::protobuf::Descriptor* descriptor =
v1::master::Response::GetExecutors::Executor::descriptor();
// Serialize the following message:
//
// mesos::master::Response::GetExecutors::Executor executor;
// *executor.mutable_executor_info() = executorInfo;
// *executor.mutable_slave_id() = slaveId;
int field;
field = v1::master::Response::GetExecutors::Executor
::kExecutorInfoFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
asV1Protobuf(executorInfo));
field = v1::master::Response::GetExecutors::Executor
::kAgentIdFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
asV1Protobuf(slaveId));
});
}
}
}
});
};
}
string Master::ReadOnlyHandler::serializeGetExecutors(
const Owned<ObjectApprovers>& approvers) const
{
// Construct framework list with both active and completed frameworks.
vector<const Framework*> frameworks;
foreachvalue (const Framework* framework, master->frameworks.registered) {
// Skip unauthorized frameworks.
if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
frameworks.push_back(framework);
}
}
foreachvalue (const Owned<Framework>& framework,
master->frameworks.completed) {
// Skip unauthorized frameworks.
if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
frameworks.push_back(framework.get());
}
}
// Lambda for serializing the following message:
//
// mesos::master::Response::GetExecutors::Executor executor;
// *executor.mutable_executor_info() = executorInfo;
// *executor.mutable_slave_id() = slaveId;
auto serializeExecutor = [](const ExecutorInfo& e, const SlaveID& s) {
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
WireFormatLite2::WriteMessageWithoutCachedSizes(
mesos::v1::master::Response::GetExecutors::Executor
::kExecutorInfoFieldNumber,
e,
&writer);
WireFormatLite2::WriteMessageWithoutCachedSizes(
mesos::v1::master::Response::GetExecutors::Executor
::kAgentIdFieldNumber,
s,
&writer);
// While an explicit Trim() isn't necessary (since the coded
// output stream is destructed before the string is returned),
// it's a quite tricky bug to diagnose if Trim() is missed, so
// we always do it explicitly to signal the reader about this
// subtlety.
writer.Trim();
return output;
};
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
// Serialize the following:
//
// mesos::master::Response::GetExecutors getExecutors;
//
// for each (executor, agent):
// mesos::master::Response::GetExecutors::Executor* executor =
// getExecutors.add_executors();
// *executor->mutable_executor_info() = executorInfo;
// *executor->mutable_slave_id() = slaveId;
foreach (const Framework* framework, frameworks) {
foreachpair (const SlaveID& slaveId,
const auto& executorsMap,
framework->executors) {
foreachvalue (const ExecutorInfo& executorInfo, executorsMap) {
// Skip unauthorized executors.
if (!approvers->approved<VIEW_EXECUTOR>(
executorInfo, framework->info)) {
continue;
}
WireFormatLite::WriteBytes(
mesos::v1::master::Response::GetExecutors::kExecutorsFieldNumber,
serializeExecutor(executorInfo, slaveId),
&writer);
}
}
}
// While an explicit Trim() isn't necessary (since the coded
// output stream is destructed before the string is returned),
// it's a quite tricky bug to diagnose if Trim() is missed, so
// we always do it explicitly to signal the reader about this
// subtlety.
writer.Trim();
return output;
}
pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
Master::ReadOnlyHandler::getExecutors(
ContentType outputContentType,
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
// Serialize the following message:
//
// mesos::master::Response response;
// response.set_type(mesos::master::Response::GET_EXECUTORS);
// *response.mutable_get_executors() = _getExecutors(approvers);
switch (outputContentType) {
case ContentType::PROTOBUF: {
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
WireFormatLite::WriteEnum(
mesos::v1::master::Response::kTypeFieldNumber,
mesos::v1::master::Response::GET_EXECUTORS,
&writer);
WireFormatLite::WriteBytes(
mesos::v1::master::Response::kGetExecutorsFieldNumber,
serializeGetExecutors(approvers),
&writer);
// We must manually trim the unused buffer space since
// we use the string before the coded output stream is
// destructed.
writer.Trim();
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
OK(std::move(output), stringify(outputContentType)),
None());
}
case ContentType::JSON: {
string body = jsonify([&](JSON::ObjectWriter* writer) {
const google::protobuf::Descriptor* descriptor =
v1::master::Response::descriptor();
int field;
field = v1::master::Response::kTypeFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
v1::master::Response::Type_Name(
v1::master::Response::GET_EXECUTORS));
field = v1::master::Response::kGetExecutorsFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
jsonifyGetExecutors(approvers));
});
// TODO(bmahler): Pass jsonp query parameter through here.
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
OK(std::move(body), stringify(outputContentType)),
None());
}
default:
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
NotAcceptable("Request must accept json or protobuf"), None());
}
}
function<void(JSON::ObjectWriter*)> Master::ReadOnlyHandler::jsonifyGetTasks(
const Owned<ObjectApprovers>& approvers) const
{
// Jsonify the following message:
//
// master::Response::GetTasks getTasks;
// for each pending task:
// *getTasks.add_pending_tasks() =
// protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
// for each task:
// *getTasks.add_tasks() = *task;
// for each unreachable task:
// *getTasks.add_unreachable_tasks() = *task;
// for each completed task:
// *getTasks.add_completed_tasks() = *task;
// TODO(bmahler): This copies the Owned object approvers.
return [=](JSON::ObjectWriter* writer) {
// Construct framework list with both active and completed frameworks.
vector<const Framework*> frameworks;
foreachvalue (const Framework* framework, master->frameworks.registered) {
// Skip unauthorized frameworks.
if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
frameworks.push_back(framework);
}
}
foreachvalue (const Owned<Framework>& framework,
master->frameworks.completed) {
// Skip unauthorized frameworks.
if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
frameworks.push_back(framework.get());
}
}
const google::protobuf::Descriptor* descriptor =
v1::master::Response::GetTasks::descriptor();
int field;
// Active tasks.
field = v1::master::Response::GetTasks::kTasksFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
[&](JSON::ArrayWriter* writer) {
foreach (const Framework* framework, frameworks) {
foreachvalue (const Task* task, framework->tasks) {
// Skip unauthorized tasks.
if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
continue;
}
writer->element(asV1Protobuf(*task));
}
}
});
// Unreachable tasks.
field = v1::master::Response::GetTasks::kUnreachableTasksFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
[&](JSON::ArrayWriter* writer) {
foreach (const Framework* framework, frameworks) {
foreachvalue (const Owned<Task>& task,
framework->unreachableTasks) {
// Skip unauthorized tasks.
if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
continue;
}
writer->element(asV1Protobuf(*task));
}
}
});
// Completed tasks.
field = v1::master::Response::GetTasks::kCompletedTasksFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
[&](JSON::ArrayWriter* writer) {
foreach (const Framework* framework, frameworks) {
foreach (const Owned<Task>& task, framework->completedTasks) {
// Skip unauthorized tasks.
if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
continue;
}
writer->element(asV1Protobuf(*task));
}
}
});
};
}
string Master::ReadOnlyHandler::serializeGetTasks(
const Owned<ObjectApprovers>& approvers) const
{
// Construct framework list with both active and completed frameworks.
vector<const Framework*> frameworks;
foreachvalue (const Framework* framework, master->frameworks.registered) {
// Skip unauthorized frameworks.
if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
frameworks.push_back(framework);
}
}
foreachvalue (const Owned<Framework>& framework,
master->frameworks.completed) {
// Skip unauthorized frameworks.
if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
frameworks.push_back(framework.get());
}
}
// Serialize the following message:
//
// mesos::master::Response::GetTasks getTasks;
// for each pending task:
// *getTasks.add_pending_tasks() =
// protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
// for each task:
// *getTasks.add_tasks() = *task;
// for each unreachable task:
// *getTasks.add_unreachable_tasks() = *task;
// for each completed task:
// *getTasks.add_completed_tasks() = *task;
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
foreach (const Framework* framework, frameworks) {
// Active tasks.
foreachvalue (const Task* task, framework->tasks) {
// Skip unauthorized tasks.
if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
continue;
}
WireFormatLite2::WriteMessageWithoutCachedSizes(
mesos::v1::master::Response::GetTasks::kTasksFieldNumber,
*task,
&writer);
}
// Unreachable tasks.
foreachvalue (const Owned<Task>& task, framework->unreachableTasks) {
// Skip unauthorized tasks.
if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
continue;
}
WireFormatLite2::WriteMessageWithoutCachedSizes(
mesos::v1::master::Response::GetTasks::kUnreachableTasksFieldNumber,
*task,
&writer);
}
// Completed tasks.
foreach (const Owned<Task>& task, framework->completedTasks) {
// Skip unauthorized tasks.
if (!approvers->approved<VIEW_TASK>(*task, framework->info)) {
continue;
}
WireFormatLite2::WriteMessageWithoutCachedSizes(
mesos::v1::master::Response::GetTasks::kCompletedTasksFieldNumber,
*task,
&writer);
}
}
// While an explicit Trim() isn't necessary (since the coded
// output stream is destructed before the string is returned),
// it's a quite tricky bug to diagnose if Trim() is missed, so
// we always do it explicitly to signal the reader about this
// subtlety.
writer.Trim();
return output;
}
pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
Master::ReadOnlyHandler::getTasks(
ContentType outputContentType,
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
// Serialize the following message:
//
// mesos::master::Response response;
// response.set_type(mesos::master::Response::GET_TASKS);
// *response.mutable_get_tasks() = _getTasks(approvers);
switch (outputContentType) {
case ContentType::PROTOBUF: {
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
WireFormatLite::WriteEnum(
mesos::v1::master::Response::kTypeFieldNumber,
mesos::v1::master::Response::GET_TASKS,
&writer);
WireFormatLite::WriteBytes(
mesos::v1::master::Response::kGetTasksFieldNumber,
serializeGetTasks(approvers),
&writer);
// We must manually trim the unused buffer space since
// we use the string before the coded output stream is
// destructed.
writer.Trim();
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
OK(std::move(output), stringify(outputContentType)),
None());
}
case ContentType::JSON: {
string body = jsonify([&](JSON::ObjectWriter* writer) {
const google::protobuf::Descriptor* descriptor =
v1::master::Response::descriptor();
int field;
field = v1::master::Response::kTypeFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
v1::master::Response::Type_Name(
v1::master::Response::GET_TASKS));
field = v1::master::Response::kGetTasksFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
jsonifyGetTasks(approvers));
});
// TODO(bmahler): Pass jsonp query parameter through here.
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
OK(std::move(body), stringify(outputContentType)),
None());
}
default:
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
NotAcceptable("Request must accept json or protobuf"), None());
}
}
pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
Master::ReadOnlyHandler::getOperations(
ContentType outputContentType,
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
// We consider a principal to be authorized to view an operation if it
// is authorized to view the resources the operation is performed on.
auto approved = [&approvers](const Operation& operation) {
Try<Resources> consumedResources =
protobuf::getConsumedResources(operation.info());
if (consumedResources.isError()) {
LOG(WARNING)
<< "Could not approve operation " << operation.uuid()
<< " since its consumed resources could not be determined:"
<< consumedResources.error();
return false;
}
foreach (const Resource& resource, consumedResources.get()) {
if (!approvers->approved<VIEW_ROLE>(resource)) {
return false;
}
}
return true;
};
mesos::master::Response response;
response.set_type(mesos::master::Response::GET_OPERATIONS);
mesos::master::Response::GetOperations* operations =
response.mutable_get_operations();
foreachvalue (const Slave* slave, master->slaves.registered) {
foreachvalue (const Operation* operation, slave->operations) {
if (approved(*operation)) {
operations->add_operations()->CopyFrom(*operation);
}
}
foreachvalue (
const Slave::ResourceProvider& resourceProvider,
slave->resourceProviders) {
foreachvalue (const Operation* operation, resourceProvider.operations) {
if (approved(*operation)) {
operations->add_operations()->CopyFrom(*operation);
}
}
}
}
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
OK(serialize(outputContentType, evolve(response)),
stringify(outputContentType)),
None());
}
pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
Master::ReadOnlyHandler::getRoles(
ContentType outputContentType,
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
const vector<string> knownRoles = master->knownRoles();
mesos::master::Response response;
response.set_type(mesos::master::Response::GET_ROLES);
mesos::master::Response::GetRoles* getRoles =
response.mutable_get_roles();
foreach (const string& name, knownRoles) {
if (!approvers->approved<VIEW_ROLE>(name)) {
continue;
}
mesos::Role* role = getRoles->add_roles();
role->set_name(name);
role->set_weight(master->weights.get(name).getOrElse(DEFAULT_WEIGHT));
RoleResourceBreakdown resourceBreakdown(master, name);
ResourceQuantities allocatedAndOffered =
resourceBreakdown.allocated() + resourceBreakdown.offered();
// `resources` will be deprecated in favor of
// `offered`, `allocated`, `reserved`, and quota consumption.
// As a result, we don't bother trying to expose more
// than {cpus, mem, disk, gpus} since we don't know if
// anything outside this set is of type SCALAR.
foreach (const auto& quantity, allocatedAndOffered) {
if (quantity.first == "cpus" || quantity.first == "mem" ||
quantity.first == "disk" || quantity.first == "gpus") {
Resource* resource = role->add_resources();
resource->set_name(quantity.first);
resource->set_type(Value::SCALAR);
*resource->mutable_scalar() = quantity.second;
}
}
Option<Role*> role_ = master->roles.get(name);
if (role_.isSome()) {
foreachkey (const FrameworkID& frameworkId, (*role_)->frameworks) {
*role->add_frameworks() = frameworkId;
}
}
}
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
OK(serialize(outputContentType, evolve(response)),
stringify(outputContentType)),
None());
}
function<void(JSON::ObjectWriter*)> Master::ReadOnlyHandler::jsonifyGetState(
const Owned<ObjectApprovers>& approvers) const
{
// Jsonify the following message:
//
// mesos::master::Response::GetState getState;
// *getState.mutable_get_tasks() = _getTasks(approvers);
// *getState.mutable_get_executors() = _getExecutors(approvers);
// *getState.mutable_get_frameworks() = _getFrameworks(approvers);
// *getState.mutable_get_agents() = _getAgents(approvers);
// TODO(bmahler): This copies the Owned object approvers.
return [=](JSON::ObjectWriter* writer) {
const google::protobuf::Descriptor* descriptor =
v1::master::Response::GetState::descriptor();
int field;
field = v1::master::Response::GetState::kGetTasksFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
jsonifyGetTasks(approvers));
field = v1::master::Response::GetState::kGetExecutorsFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
jsonifyGetExecutors(approvers));
field = v1::master::Response::GetState::kGetFrameworksFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
jsonifyGetFrameworks(approvers));
field = v1::master::Response::GetState::kGetAgentsFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
jsonifyGetAgents(approvers));
};
}
string Master::ReadOnlyHandler::serializeGetState(
const Owned<ObjectApprovers>& approvers) const
{
// Serialize the following message:
//
// mesos::master::Response::GetState getState;
// *getState.mutable_get_tasks() = _getTasks(approvers);
// *getState.mutable_get_executors() = _getExecutors(approvers);
// *getState.mutable_get_frameworks() = _getFrameworks(approvers);
// *getState.mutable_get_agents() = _getAgents(approvers);
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
WireFormatLite::WriteBytes(
mesos::v1::master::Response::GetState::kGetTasksFieldNumber,
serializeGetTasks(approvers),
&writer);
WireFormatLite::WriteBytes(
mesos::v1::master::Response::GetState::kGetExecutorsFieldNumber,
serializeGetExecutors(approvers),
&writer);
WireFormatLite::WriteBytes(
mesos::v1::master::Response::GetState::kGetFrameworksFieldNumber,
serializeGetFrameworks(approvers),
&writer);
WireFormatLite::WriteBytes(
mesos::v1::master::Response::GetState::kGetAgentsFieldNumber,
serializeGetAgents(approvers),
&writer);
// While an explicit Trim() isn't necessary (since the coded
// output stream is destructed before the string is returned),
// it's a quite tricky bug to diagnose if Trim() is missed, so
// we always do it explicitly to signal the reader about this
// subtlety.
writer.Trim();
return output;
}
pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
Master::ReadOnlyHandler::getState(
ContentType outputContentType,
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
// Serialize the following message:
//
// mesos::master::Response response;
// response.set_type(mesos::master::Response::GET_STATE);
// *response.mutable_get_state() = _getState(approvers);
switch (outputContentType) {
case ContentType::PROTOBUF: {
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
WireFormatLite::WriteEnum(
mesos::v1::master::Response::kTypeFieldNumber,
mesos::v1::master::Response::GET_STATE,
&writer);
WireFormatLite::WriteBytes(
mesos::v1::master::Response::kGetStateFieldNumber,
serializeGetState(approvers),
&writer);
// We must manually trim the unused buffer space since
// we use the string before the coded output stream is
// destructed.
writer.Trim();
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
OK(std::move(output), stringify(outputContentType)),
None());
}
case ContentType::JSON: {
string body = jsonify([&](JSON::ObjectWriter* writer) {
const google::protobuf::Descriptor* descriptor =
v1::master::Response::descriptor();
int field;
field = v1::master::Response::kTypeFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
v1::master::Response::Type_Name(
v1::master::Response::GET_STATE));
field = v1::master::Response::kGetStateFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
jsonifyGetState(approvers));
});
// TODO(bmahler): Pass jsonp query parameter through here.
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
OK(std::move(body), stringify(outputContentType)),
None());
}
default:
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
NotAcceptable("Request must accept json or protobuf"), None());
}
}
pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>
Master::ReadOnlyHandler::subscribe(
ContentType outputContentType,
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
process::http::Pipe pipe;
OK ok;
ok.headers["Content-Type"] = stringify(outputContentType);
ok.type = process::http::Response::PIPE;
ok.reader = pipe.reader();
StreamingHttpConnection<v1::master::Event> http(
pipe.writer(), outputContentType);
// Serialize the following event:
//
// mesos::master::Event event;
// event.set_type(mesos::master::Event::SUBSCRIBED);
// *event.mutable_subscribed()->mutable_get_state() =
// _getState(approvers);
// event.mutable_subscribed()->set_heartbeat_interval_seconds(
// DEFAULT_HEARTBEAT_INTERVAL.secs());
//
// http.send(event);
switch (outputContentType) {
case ContentType::PROTOBUF: {
string serialized;
google::protobuf::io::StringOutputStream stream(&serialized);
google::protobuf::io::CodedOutputStream writer(&stream);
WireFormatLite::WriteEnum(
mesos::v1::master::Event::kTypeFieldNumber,
mesos::v1::master::Event::SUBSCRIBED,
&writer);
WireFormatLite::WriteBytes(
mesos::v1::master::Event::kSubscribedFieldNumber,
serializeSubscribe(approvers),
&writer);
// We must manually trim the unused buffer space since
// we use the string before the coded output stream is
// destructed.
writer.Trim();
http.send(serialized);
break;
}
case ContentType::JSON: {
string serialized = jsonify([&](JSON::ObjectWriter* writer) {
const google::protobuf::Descriptor* descriptor =
v1::master::Event::descriptor();
int field;
field = v1::master::Event::kTypeFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
v1::master::Event::Type_Name(
v1::master::Event::SUBSCRIBED));
field = v1::master::Event::kSubscribedFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
jsonifySubscribe(approvers));
});
http.send(serialized);
break;
}
default:
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
NotAcceptable("Request must accept json or protobuf"), None());
}
mesos::master::Event heartbeatEvent;
heartbeatEvent.set_type(mesos::master::Event::HEARTBEAT);
http.send(heartbeatEvent);
// This new subscriber needs to be added in the post-processing step.
Master::ReadOnlyHandler::PostProcessing::Subscribe s = {approvers, http};
Master::ReadOnlyHandler::PostProcessing postProcessing = { std::move(s) };
return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>(
ok,
std::move(postProcessing));
}
function<void(JSON::ObjectWriter*)> Master::ReadOnlyHandler::jsonifySubscribe(
const Owned<ObjectApprovers>& approvers) const
{
// Jsonify the following message:
//
// mesos::master::Event::Subscribed subscribed;
// *subscribed.mutable_get_state() = _getState(approvers);
// subscribed.set_heartbeat_interval_seconds(
// DEFAULT_HEARTBEAT_INTERVAL.secs());
// TODO(bmahler): This copies the Owned object approvers.
return [=](JSON::ObjectWriter* writer) {
const google::protobuf::Descriptor* descriptor =
v1::master::Event::Subscribed::descriptor();
int field;
field = v1::master::Event::Subscribed::kGetStateFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
jsonifyGetState(approvers));
field = v1::master::Event::Subscribed::kHeartbeatIntervalSecondsFieldNumber;
writer->field(
descriptor->FindFieldByNumber(field)->name(),
DEFAULT_HEARTBEAT_INTERVAL.secs());
};
}
string Master::ReadOnlyHandler::serializeSubscribe(
const Owned<ObjectApprovers>& approvers) const
{
// Serialize the following message:
//
// mesos::master::Event::Subscribed subscribed;
// *subscribed.mutable_get_state() = _getState(approvers);
// subscribed.set_heartbeat_interval_seconds(
// DEFAULT_HEARTBEAT_INTERVAL.secs());
string output;
google::protobuf::io::StringOutputStream stream(&output);
google::protobuf::io::CodedOutputStream writer(&stream);
WireFormatLite::WriteBytes(
mesos::v1::master::Event::Subscribed::kGetStateFieldNumber,
serializeGetState(approvers),
&writer);
WireFormatLite::WriteDouble(
mesos::v1::master::Event::Subscribed
::kHeartbeatIntervalSecondsFieldNumber,
DEFAULT_HEARTBEAT_INTERVAL.secs(),
&writer);
// While an explicit Trim() isn't necessary (since the coded
// output stream is destructed before the string is returned),
// it's a quite tricky bug to diagnose if Trim() is missed, so
// we always do it explicitly to signal the reader about this
// subtlety.
writer.Trim();
return output;
}
} // namespace master {
} // namespace internal {
} // namespace mesos {