| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License |
| |
| #include "master/master.hpp" |
| |
| #include <string> |
| #include <vector> |
| |
| #include <google/protobuf/descriptor.h> |
| #include <google/protobuf/wire_format_lite.h> |
| |
| #include <google/protobuf/io/coded_stream.h> |
| #include <google/protobuf/io/zero_copy_stream_impl_lite.h> |
| |
| #include <mesos/mesos.hpp> |
| |
| #include <mesos/authorizer/authorizer.hpp> |
| |
| #include <process/http.hpp> |
| #include <process/owned.hpp> |
| |
| #include <stout/foreach.hpp> |
| #include <stout/hashmap.hpp> |
| #include <stout/hashset.hpp> |
| #include <stout/jsonify.hpp> |
| #include <stout/option.hpp> |
| #include <stout/representation.hpp> |
| |
| #include "common/build.hpp" |
| #include "common/http.hpp" |
| |
| using google::protobuf::internal::WireFormatLite; |
| |
| using process::Owned; |
| |
| using process::http::NotAcceptable; |
| using process::http::OK; |
| using process::http::Response; |
| |
| using mesos::authorization::VIEW_EXECUTOR; |
| using mesos::authorization::VIEW_FLAGS; |
| using mesos::authorization::VIEW_FRAMEWORK; |
| using mesos::authorization::VIEW_ROLE; |
| using mesos::authorization::VIEW_TASK; |
| |
| using mesos::internal::protobuf::WireFormatLite2; |
| |
| using std::function; |
| using std::pair; |
| using std::string; |
| using std::vector; |
| |
| namespace mesos { |
| namespace internal { |
| namespace master { |
| |
| // Pull in model overrides from common. |
| using mesos::internal::model; |
| |
| // The summary representation of `T` to support the `/state-summary` endpoint. |
| // e.g., `Summary<Slave>`. |
| template <typename T> |
| struct Summary : Representation<T> |
| { |
| using Representation<T>::Representation; |
| }; |
| |
| |
| // The full representation of `T` to support the `/state` endpoint. |
| // e.g., `Full<Slave>`. |
| template <typename T> |
| struct Full : Representation<T> |
| { |
| using Representation<T>::Representation; |
| }; |
| |
| |
| // Filtered representation of Full<Framework>. |
| // Executors and Tasks are filtered based on whether the |
| // user is authorized to view them. |
| // |
| // TODO(bevers): Consider moving writers and other json-related |
| // code into a separate file. |
| struct FullFrameworkWriter { |
| FullFrameworkWriter( |
| const process::Owned<ObjectApprovers>& approvers, |
| const Framework* framework); |
| |
| void operator()(JSON::ObjectWriter* writer) const; |
| |
| const process::Owned<ObjectApprovers>& approvers_; |
| const Framework* framework_; |
| }; |
| |
| |
| struct SlaveWriter |
| { |
| SlaveWriter( |
| const Slave& slave, |
| const Option<DrainInfo>& drainInfo, |
| bool deactivated, |
| const process::Owned<ObjectApprovers>& approvers); |
| |
| void operator()(JSON::ObjectWriter* writer) const; |
| |
| const Slave& slave_; |
| const Option<DrainInfo> drainInfo_; |
| const bool deactivated_; |
| const process::Owned<ObjectApprovers>& approvers_; |
| }; |
| |
| |
| struct SlavesWriter |
| { |
| SlavesWriter( |
| const Master::Slaves& slaves, |
| const process::Owned<ObjectApprovers>& approvers, |
| const IDAcceptor<SlaveID>& selectSlaveId); |
| |
| void operator()(JSON::ObjectWriter* writer) const; |
| |
| void writeSlave(const Slave* slave, JSON::ObjectWriter* writer) const; |
| |
| const Master::Slaves& slaves_; |
| const process::Owned<ObjectApprovers>& approvers_; |
| const IDAcceptor<SlaveID>& selectSlaveId_; |
| }; |
| |
| |
| void json(JSON::ObjectWriter* writer, const Summary<Framework>& summary); |
| |
| |
| FullFrameworkWriter::FullFrameworkWriter( |
| const Owned<ObjectApprovers>& approvers, |
| const Framework* framework) |
| : approvers_(approvers), |
| framework_(framework) |
| {} |
| |
| |
| void FullFrameworkWriter::operator()(JSON::ObjectWriter* writer) const |
| { |
| json(writer, Summary<Framework>(*framework_)); |
| |
| // Add additional fields to those generated by the |
| // `Summary<Framework>` overload. |
| writer->field("user", framework_->info.user()); |
| writer->field("failover_timeout", framework_->info.failover_timeout()); |
| writer->field("checkpoint", framework_->info.checkpoint()); |
| writer->field("registered_time", framework_->registeredTime.secs()); |
| writer->field("unregistered_time", framework_->unregisteredTime.secs()); |
| |
| if (framework_->info.has_principal()) { |
| writer->field("principal", framework_->info.principal()); |
| } |
| |
| // TODO(bmahler): Consider deprecating this in favor of the split |
| // used and offered resources added in `Summary<Framework>`. |
| writer->field( |
| "resources", |
| framework_->totalUsedResources + framework_->totalOfferedResources); |
| |
| // TODO(benh): Consider making reregisteredTime an Option. |
| if (framework_->registeredTime != framework_->reregisteredTime) { |
| writer->field("reregistered_time", framework_->reregisteredTime.secs()); |
| } |
| |
| // For multi-role frameworks the `role` field will be unset. |
| // Note that we could set `roles` here for both cases, which |
| // would make tooling simpler (only need to look for `roles`). |
| // However, we opted to just mirror the protobuf akin to how |
| // generic protobuf -> JSON translation works. |
| if (framework_->capabilities.multiRole) { |
| writer->field("roles", framework_->info.roles()); |
| } else { |
| writer->field("role", framework_->info.role()); |
| } |
| |
| // Model all of the tasks associated with a framework. |
| writer->field("tasks", [this](JSON::ArrayWriter* writer) { |
| foreachvalue (Task* task, framework_->tasks) { |
| // Skip unauthorized tasks. |
| if (!approvers_->approved<VIEW_TASK>(*task, framework_->info)) { |
| continue; |
| } |
| |
| writer->element(*task); |
| } |
| }); |
| |
| writer->field("unreachable_tasks", [this](JSON::ArrayWriter* writer) { |
| foreachvalue (const Owned<Task>& task, framework_->unreachableTasks) { |
| // Skip unauthorized tasks. |
| if (!approvers_->approved<VIEW_TASK>(*task, framework_->info)) { |
| continue; |
| } |
| |
| writer->element(*task); |
| } |
| }); |
| |
| writer->field("completed_tasks", [this](JSON::ArrayWriter* writer) { |
| foreach (const Owned<Task>& task, framework_->completedTasks) { |
| // Skip unauthorized tasks. |
| if (!approvers_->approved<VIEW_TASK>(*task, framework_->info)) { |
| continue; |
| } |
| |
| writer->element(*task); |
| } |
| }); |
| |
| // Model all of the offers associated with a framework. |
| writer->field("offers", [this](JSON::ArrayWriter* writer) { |
| foreach (Offer* offer, framework_->offers) { |
| writer->element(*offer); |
| } |
| }); |
| |
| // Model all of the executors of a framework. |
| writer->field("executors", [this](JSON::ArrayWriter* writer) { |
| foreachpair ( |
| const SlaveID& slaveId, |
| const auto& executorsMap, |
| framework_->executors) { |
| foreachvalue (const ExecutorInfo& executor, executorsMap) { |
| writer->element([this, |
| &executor, |
| &slaveId](JSON::ObjectWriter* writer) { |
| // Skip unauthorized executors. |
| if (!approvers_->approved<VIEW_EXECUTOR>( |
| executor, framework_->info)) { |
| return; |
| } |
| |
| json(writer, executor); |
| writer->field("slave_id", slaveId.value()); |
| }); |
| } |
| } |
| }); |
| |
| // Model all of the labels associated with a framework. |
| if (framework_->info.has_labels()) { |
| writer->field("labels", framework_->info.labels()); |
| } |
| |
| if (framework_->offerConstraints().isSome()) { |
| writer->field( |
| "offer_constraints", JSON::Protobuf(*framework_->offerConstraints())); |
| }; |
| } |
| |
| |
| SlaveWriter::SlaveWriter( |
| const Slave& slave, |
| const Option<DrainInfo>& drainInfo, |
| bool deactivated, |
| const Owned<ObjectApprovers>& approvers) |
| : slave_(slave), |
| drainInfo_(drainInfo), |
| deactivated_(deactivated), |
| approvers_(approvers) |
| {} |
| |
| |
| void SlaveWriter::operator()(JSON::ObjectWriter* writer) const |
| { |
| json(writer, slave_.info); |
| |
| writer->field("pid", string(slave_.pid)); |
| writer->field("registered_time", slave_.registeredTime.secs()); |
| |
| if (slave_.reregisteredTime.isSome()) { |
| writer->field("reregistered_time", slave_.reregisteredTime->secs()); |
| } |
| |
| const Resources& totalResources = slave_.totalResources; |
| writer->field("resources", totalResources); |
| writer->field("used_resources", Resources::sum(slave_.usedResources)); |
| writer->field("offered_resources", slave_.offeredResources); |
| writer->field( |
| "reserved_resources", |
| [&totalResources, this](JSON::ObjectWriter* writer) { |
| foreachpair (const string& role, const Resources& reservation, |
| totalResources.reservations()) { |
| // TODO(arojas): Consider showing unapproved resources in an |
| // aggregated special field, so that all resource values add up |
| // MESOS-7779. |
| if (approvers_->approved<VIEW_ROLE>(role)) { |
| writer->field(role, reservation); |
| } |
| } |
| }); |
| writer->field("unreserved_resources", totalResources.unreserved()); |
| |
| writer->field("active", slave_.active); |
| writer->field("deactivated", deactivated_); |
| writer->field("version", slave_.version); |
| writer->field("capabilities", slave_.capabilities.toRepeatedPtrField()); |
| |
| if (drainInfo_.isSome()) { |
| writer->field("drain_info", JSON::Protobuf(drainInfo_.get())); |
| |
| if (slave_.estimatedDrainStartTime.isSome()) { |
| writer->field( |
| "estimated_drain_start_time_seconds", |
| slave_.estimatedDrainStartTime->secs()); |
| } |
| } |
| } |
| |
| |
| SlavesWriter::SlavesWriter( |
| const Master::Slaves& slaves, |
| const Owned<ObjectApprovers>& approvers, |
| const IDAcceptor<SlaveID>& selectSlaveId) |
| : slaves_(slaves), approvers_(approvers), selectSlaveId_(selectSlaveId) |
| {} |
| |
| |
| void SlavesWriter::operator()(JSON::ObjectWriter* writer) const |
| { |
| writer->field("slaves", [this](JSON::ArrayWriter* writer) { |
| foreachvalue (const Slave* slave, slaves_.registered) { |
| if (!selectSlaveId_.accept(slave->id)) { |
| continue; |
| } |
| |
| writer->element([this, &slave](JSON::ObjectWriter* writer) { |
| writeSlave(slave, writer); |
| }); |
| } |
| }); |
| |
| writer->field("recovered_slaves", [this](JSON::ArrayWriter* writer) { |
| foreachvalue (const SlaveInfo& slaveInfo, slaves_.recovered) { |
| if (!selectSlaveId_.accept(slaveInfo.id())) { |
| continue; |
| } |
| |
| writer->element([&slaveInfo](JSON::ObjectWriter* writer) { |
| json(writer, slaveInfo); |
| }); |
| } |
| }); |
| } |
| |
| |
| void SlavesWriter::writeSlave( |
| const Slave* slave, JSON::ObjectWriter* writer) const |
| { |
| SlaveWriter( |
| *slave, |
| slaves_.draining.get(slave->id), |
| slaves_.deactivated.contains(slave->id), |
| approvers_)(writer); |
| |
| // Add the complete protobuf->JSON for all used, reserved, |
| // and offered resources. The other endpoints summarize |
| // resource information, which omits the details of |
| // reservations and persistent volumes. Full resource |
| // information is necessary so that operators can use the |
| // `/unreserve` and `/destroy-volumes` endpoints. |
| |
| hashmap<string, Resources> reserved = slave->totalResources.reservations(); |
| |
| writer->field( |
| "reserved_resources_full", |
| [&reserved, this](JSON::ObjectWriter* writer) { |
| foreachpair (const string& role, |
| const Resources& resources, |
| reserved) { |
| if (approvers_->approved<VIEW_ROLE>(role)) { |
| writer->field(role, [&resources, this]( |
| JSON::ArrayWriter* writer) { |
| foreach (Resource resource, resources) { |
| if (approvers_->approved<VIEW_ROLE>(resource)) { |
| convertResourceFormat(&resource, ENDPOINT); |
| writer->element(JSON::Protobuf(resource)); |
| } |
| } |
| }); |
| } |
| } |
| }); |
| |
| Resources unreservedResources = slave->totalResources.unreserved(); |
| |
| writer->field( |
| "unreserved_resources_full", |
| [&unreservedResources, this](JSON::ArrayWriter* writer) { |
| foreach (Resource resource, unreservedResources) { |
| if (approvers_->approved<VIEW_ROLE>(resource)) { |
| convertResourceFormat(&resource, ENDPOINT); |
| writer->element(JSON::Protobuf(resource)); |
| } |
| } |
| }); |
| |
| Resources usedResources = Resources::sum(slave->usedResources); |
| |
| writer->field( |
| "used_resources_full", |
| [&usedResources, this](JSON::ArrayWriter* writer) { |
| foreach (Resource resource, usedResources) { |
| if (approvers_->approved<VIEW_ROLE>(resource)) { |
| convertResourceFormat(&resource, ENDPOINT); |
| writer->element(JSON::Protobuf(resource)); |
| } |
| } |
| }); |
| |
| const Resources& offeredResources = slave->offeredResources; |
| |
| writer->field( |
| "offered_resources_full", |
| [&offeredResources, this](JSON::ArrayWriter* writer) { |
| foreach (Resource resource, offeredResources) { |
| if (approvers_->approved<VIEW_ROLE>(resource)) { |
| convertResourceFormat(&resource, ENDPOINT); |
| writer->element(JSON::Protobuf(resource)); |
| } |
| } |
| }); |
| } |
| |
| |
| void json(JSON::ObjectWriter* writer, const Summary<Framework>& summary) |
| { |
| const Framework& framework = summary; |
| |
| writer->field("id", framework.id().value()); |
| writer->field("name", framework.info.name()); |
| |
| // Omit pid for http frameworks. |
| if (framework.pid().isSome()) { |
| writer->field("pid", string(framework.pid().get())); |
| } |
| |
| // TODO(bmahler): Use these in the webui. |
| writer->field("used_resources", framework.totalUsedResources); |
| writer->field("offered_resources", framework.totalOfferedResources); |
| writer->field("capabilities", framework.info.capabilities()); |
| writer->field("hostname", framework.info.hostname()); |
| writer->field("webui_url", framework.info.webui_url()); |
| writer->field("active", framework.active()); |
| writer->field("connected", framework.connected()); |
| writer->field("recovered", framework.recovered()); |
| } |
| |
| |
| // This abstraction has no side-effects. It factors out computing the |
| // mapping from 'slaves' to 'frameworks' to answer the questions 'what |
| // frameworks are running on a given slave?' and 'what slaves are |
| // running the given framework?'. |
| class SlaveFrameworkMapping |
| { |
| public: |
| SlaveFrameworkMapping(const hashmap<FrameworkID, Framework*>& frameworks) |
| { |
| foreachpair (const FrameworkID& frameworkId, |
| const Framework* framework, |
| frameworks) { |
| foreachvalue (const Task* task, framework->tasks) { |
| frameworksToSlaves[frameworkId].insert(task->slave_id()); |
| slavesToFrameworks[task->slave_id()].insert(frameworkId); |
| } |
| |
| foreachvalue (const Owned<Task>& task, framework->unreachableTasks) { |
| frameworksToSlaves[frameworkId].insert(task->slave_id()); |
| slavesToFrameworks[task->slave_id()].insert(frameworkId); |
| } |
| |
| foreach (const Owned<Task>& task, framework->completedTasks) { |
| frameworksToSlaves[frameworkId].insert(task->slave_id()); |
| slavesToFrameworks[task->slave_id()].insert(frameworkId); |
| } |
| } |
| } |
| |
| const hashset<FrameworkID>& frameworks(const SlaveID& slaveId) const |
| { |
| const auto iterator = slavesToFrameworks.find(slaveId); |
| return iterator != slavesToFrameworks.end() ? |
| iterator->second : hashset<FrameworkID>::EMPTY; |
| } |
| |
| const hashset<SlaveID>& slaves(const FrameworkID& frameworkId) const |
| { |
| const auto iterator = frameworksToSlaves.find(frameworkId); |
| return iterator != frameworksToSlaves.end() ? |
| iterator->second : hashset<SlaveID>::EMPTY; |
| } |
| |
| private: |
| hashmap<SlaveID, hashset<FrameworkID>> slavesToFrameworks; |
| hashmap<FrameworkID, hashset<SlaveID>> frameworksToSlaves; |
| }; |
| |
| |
| // This abstraction has no side-effects. It factors out the accounting |
| // for a 'TaskState' summary. We use this to summarize 'TaskState's |
| // for both frameworks as well as slaves. |
| struct TaskStateSummary |
| { |
| // TODO(jmlvanre): Possibly clean this up as per MESOS-2694. |
| const static TaskStateSummary EMPTY; |
| |
| TaskStateSummary() |
| : staging(0), |
| starting(0), |
| running(0), |
| killing(0), |
| finished(0), |
| killed(0), |
| failed(0), |
| lost(0), |
| error(0), |
| dropped(0), |
| unreachable(0), |
| gone(0), |
| gone_by_operator(0), |
| unknown(0) {} |
| |
| // Account for the state of the given task. |
| void count(const Task& task) |
| { |
| switch (task.state()) { |
| case TASK_STAGING: { ++staging; break; } |
| case TASK_STARTING: { ++starting; break; } |
| case TASK_RUNNING: { ++running; break; } |
| case TASK_KILLING: { ++killing; break; } |
| case TASK_FINISHED: { ++finished; break; } |
| case TASK_KILLED: { ++killed; break; } |
| case TASK_FAILED: { ++failed; break; } |
| case TASK_LOST: { ++lost; break; } |
| case TASK_ERROR: { ++error; break; } |
| case TASK_DROPPED: { ++dropped; break; } |
| case TASK_UNREACHABLE: { ++unreachable; break; } |
| case TASK_GONE: { ++gone; break; } |
| case TASK_GONE_BY_OPERATOR: { ++gone_by_operator; break; } |
| case TASK_UNKNOWN: { ++unknown; break; } |
| // No default case allows for a helpful compiler error if we |
| // introduce a new state. |
| } |
| } |
| |
| size_t staging; |
| size_t starting; |
| size_t running; |
| size_t killing; |
| size_t finished; |
| size_t killed; |
| size_t failed; |
| size_t lost; |
| size_t error; |
| size_t dropped; |
| size_t unreachable; |
| size_t gone; |
| size_t gone_by_operator; |
| size_t unknown; |
| }; |
| |
| |
| const TaskStateSummary TaskStateSummary::EMPTY; |
| |
| |
| // This abstraction has no side-effects. It factors out computing the |
| // 'TaskState' summaries for frameworks and slaves. This answers the |
| // questions 'How many tasks are in each state for a given framework?' |
| // and 'How many tasks are in each state for a given slave?'. |
| class TaskStateSummaries |
| { |
| public: |
| TaskStateSummaries(const hashmap<FrameworkID, Framework*>& frameworks) |
| { |
| foreachpair (const FrameworkID& frameworkId, |
| const Framework* framework, |
| frameworks) { |
| foreachvalue (const Task* task, framework->tasks) { |
| frameworkTaskSummaries[frameworkId].count(*task); |
| slaveTaskSummaries[task->slave_id()].count(*task); |
| } |
| |
| foreachvalue (const Owned<Task>& task, framework->unreachableTasks) { |
| frameworkTaskSummaries[frameworkId].count(*task); |
| slaveTaskSummaries[task->slave_id()].count(*task); |
| } |
| |
| foreach (const Owned<Task>& task, framework->completedTasks) { |
| frameworkTaskSummaries[frameworkId].count(*task); |
| slaveTaskSummaries[task->slave_id()].count(*task); |
| } |
| } |
| } |
| |
| const TaskStateSummary& framework(const FrameworkID& frameworkId) const |
| { |
| const auto iterator = frameworkTaskSummaries.find(frameworkId); |
| return iterator != frameworkTaskSummaries.end() ? |
| iterator->second : TaskStateSummary::EMPTY; |
| } |
| |
| const TaskStateSummary& slave(const SlaveID& slaveId) const |
| { |
| const auto iterator = slaveTaskSummaries.find(slaveId); |
| return iterator != slaveTaskSummaries.end() ? |
| iterator->second : TaskStateSummary::EMPTY; |
| } |
| |
| private: |
| hashmap<FrameworkID, TaskStateSummary> frameworkTaskSummaries; |
| hashmap<SlaveID, TaskStateSummary> slaveTaskSummaries; |
| }; |
| |
| |
| pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>> |
| Master::ReadOnlyHandler::frameworks( |
| ContentType outputContentType, |
| const hashmap<std::string, std::string>& query, |
| const process::Owned<ObjectApprovers>& approvers) const |
| { |
| CHECK_EQ(outputContentType, ContentType::JSON); |
| |
| IDAcceptor<FrameworkID> selectFrameworkId( |
| query.get("framework_id")); |
| |
| // This lambda is consumed before the outer lambda |
| // returns, hence capture by reference is fine here. |
| const Master* master = this->master; |
| auto frameworks = [master, &approvers, &selectFrameworkId]( |
| JSON::ObjectWriter* writer) { |
| // Model all of the frameworks. |
| writer->field( |
| "frameworks", |
| [master, &approvers, &selectFrameworkId]( |
| JSON::ArrayWriter* writer) { |
| foreachvalue ( |
| Framework* framework, master->frameworks.registered) { |
| // Skip unauthorized frameworks or frameworks |
| // without a matching ID. |
| if (!selectFrameworkId.accept(framework->id()) || |
| !approvers->approved<VIEW_FRAMEWORK>(framework->info)) { |
| continue; |
| } |
| |
| writer->element(FullFrameworkWriter(approvers, framework)); |
| } |
| }); |
| |
| // Model all of the completed frameworks. |
| writer->field( |
| "completed_frameworks", |
| [master, &approvers, &selectFrameworkId]( |
| JSON::ArrayWriter* writer) { |
| foreachvalue (const Owned<Framework>& framework, |
| master->frameworks.completed) { |
| // Skip unauthorized frameworks or frameworks |
| // without a matching ID. |
| if (!selectFrameworkId.accept(framework->id()) || |
| !approvers->approved<VIEW_FRAMEWORK>(framework->info)) { |
| continue; |
| } |
| |
| writer->element( |
| FullFrameworkWriter(approvers, framework.get())); |
| } |
| }); |
| |
| // Unregistered frameworks are no longer possible. We emit an |
| // empty array for the sake of backward compatibility. |
| writer->field("unregistered_frameworks", [](JSON::ArrayWriter*) {}); |
| }; |
| |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| OK(jsonify(frameworks), query.get("jsonp")), |
| None()); |
| } |
| |
| |
| pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>> |
| Master::ReadOnlyHandler::roles( |
| ContentType outputContentType, |
| const hashmap<std::string, std::string>& query, |
| const process::Owned<ObjectApprovers>& approvers) const |
| { |
| CHECK_EQ(outputContentType, ContentType::JSON); |
| |
| const Master* master = this->master; |
| |
| const vector<string> knownRoles = master->knownRoles(); |
| |
| auto roles = [&](JSON::ObjectWriter* writer) { |
| writer->field( |
| "roles", |
| [&](JSON::ArrayWriter* writer) { |
| foreach (const string& name, knownRoles) { |
| if (!approvers->approved<VIEW_ROLE>(name)) { |
| continue; |
| } |
| |
| writer->element([&](JSON::ObjectWriter* writer) { |
| writer->field("name", name); |
| |
| writer->field( |
| "weight", |
| master->weights.get(name).getOrElse(DEFAULT_WEIGHT)); |
| |
| Option<Role*> role = master->roles.get(name); |
| |
| RoleResourceBreakdown resourceBreakdown(master, name); |
| |
| // Prior to Mesos 1.9, this field is filled based on |
| // `QuotaInfo` which is now deprecated. For backward |
| // compatibility reasons, we do not use any formatter |
| // for the new struct but construct the response by hand. |
| // Specifically: |
| // |
| // - We keep the `role` field which was present in the |
| // `QuotaInfo`. |
| // |
| // - We name the field using singular `guarantee` and `limit` |
| // which is different from the plural used in `QuotaConfig`. |
| const Quota quota = master->quotas.get(name).getOrElse(Quota()); |
| |
| writer->field("quota", [&](JSON::ObjectWriter* writer) { |
| writer->field("role", name); |
| |
| writer->field("guarantee", quota.guarantees); |
| writer->field("limit", quota.limits); |
| writer->field("consumed", resourceBreakdown.consumedQuota()); |
| }); |
| |
| ResourceQuantities allocated = resourceBreakdown.allocated(); |
| ResourceQuantities offered = resourceBreakdown.offered(); |
| |
| // Deprecated by allocated, offered, reserved. |
| writer->field("resources", allocated + offered); |
| |
| writer->field("allocated", allocated); |
| writer->field("offered", offered); |
| writer->field("reserved", resourceBreakdown.reserved()); |
| |
| if (role.isNone()) { |
| writer->field("frameworks", [](JSON::ArrayWriter*) {}); |
| } else { |
| writer->field("frameworks", [&](JSON::ArrayWriter* writer) { |
| foreachkey (const FrameworkID& id, (*role)->frameworks) { |
| writer->element(id.value()); |
| } |
| }); |
| } |
| }); |
| } |
| }); |
| }; |
| |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| OK(jsonify(roles), query.get("jsonp")), |
| None()); |
| } |
| |
| |
| pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>> |
| Master::ReadOnlyHandler::slaves( |
| ContentType outputContentType, |
| const hashmap<std::string, std::string>& query, |
| const process::Owned<ObjectApprovers>& approvers) const |
| { |
| CHECK_EQ(outputContentType, ContentType::JSON); |
| |
| IDAcceptor<SlaveID> selectSlaveId(query.get("slave_id")); |
| |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| OK(jsonify(SlavesWriter(master->slaves, approvers, selectSlaveId)), |
| query.get("jsonp")), |
| None()); |
| } |
| |
| |
| pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>> |
| Master::ReadOnlyHandler::state( |
| ContentType outputContentType, |
| const hashmap<std::string, std::string>& query, |
| const process::Owned<ObjectApprovers>& approvers) const |
| { |
| CHECK_EQ(outputContentType, ContentType::JSON); |
| |
| const Master* master = this->master; |
| auto calculateState = [master, &approvers](JSON::ObjectWriter* writer) { |
| writer->field("version", MESOS_VERSION); |
| |
| if (build::GIT_SHA.isSome()) { |
| writer->field("git_sha", build::GIT_SHA.get()); |
| } |
| |
| if (build::GIT_BRANCH.isSome()) { |
| writer->field("git_branch", build::GIT_BRANCH.get()); |
| } |
| |
| if (build::GIT_TAG.isSome()) { |
| writer->field("git_tag", build::GIT_TAG.get()); |
| } |
| |
| writer->field("build_date", build::DATE); |
| writer->field("build_time", build::TIME); |
| writer->field("build_user", build::USER); |
| writer->field("start_time", master->startTime.secs()); |
| |
| if (master->electedTime.isSome()) { |
| writer->field("elected_time", master->electedTime->secs()); |
| } |
| |
| writer->field("id", master->info().id()); |
| writer->field("pid", string(master->self())); |
| writer->field("hostname", master->info().hostname()); |
| writer->field("capabilities", master->info().capabilities()); |
| writer->field("activated_slaves", master->_const_slaves_active()); |
| writer->field("deactivated_slaves", master->_const_slaves_inactive()); |
| writer->field("unreachable_slaves", master->_const_slaves_unreachable()); |
| |
| if (master->info().has_domain()) { |
| writer->field("domain", master->info().domain()); |
| } |
| |
| // TODO(haosdent): Deprecated this in favor of `leader_info` below. |
| if (master->leader.isSome()) { |
| writer->field("leader", master->leader->pid()); |
| } |
| |
| if (master->leader.isSome()) { |
| writer->field("leader_info", [master](JSON::ObjectWriter* writer) { |
| json(writer, master->leader.get()); |
| }); |
| } |
| |
| if (approvers->approved<VIEW_FLAGS>()) { |
| if (master->flags.cluster.isSome()) { |
| writer->field("cluster", master->flags.cluster.get()); |
| } |
| |
| if (master->flags.log_dir.isSome()) { |
| writer->field("log_dir", master->flags.log_dir.get()); |
| } |
| |
| if (master->flags.external_log_file.isSome()) { |
| writer->field("external_log_file", |
| master->flags.external_log_file.get()); |
| } |
| |
| writer->field("flags", [master](JSON::ObjectWriter* writer) { |
| foreachvalue (const flags::Flag& flag, master->flags) { |
| Option<string> value = flag.stringify(master->flags); |
| if (value.isSome()) { |
| writer->field(flag.effective_name().value, value.get()); |
| } |
| } |
| }); |
| } |
| |
| // Model all of the registered slaves. |
| writer->field( |
| "slaves", |
| [master, &approvers](JSON::ArrayWriter* writer) { |
| foreachvalue (Slave* slave, master->slaves.registered) { |
| writer->element(SlaveWriter( |
| *slave, |
| master->slaves.draining.get(slave->id), |
| master->slaves.deactivated.contains(slave->id), |
| approvers)); |
| } |
| }); |
| |
| // Model all of the recovered slaves. |
| writer->field( |
| "recovered_slaves", |
| [master](JSON::ArrayWriter* writer) { |
| foreachvalue ( |
| const SlaveInfo& slaveInfo, master->slaves.recovered) { |
| writer->element([&slaveInfo](JSON::ObjectWriter* writer) { |
| json(writer, slaveInfo); |
| }); |
| } |
| }); |
| |
| // Model all of the frameworks. |
| writer->field( |
| "frameworks", |
| [master, &approvers](JSON::ArrayWriter* writer) { |
| foreachvalue ( |
| Framework* framework, master->frameworks.registered) { |
| // Skip unauthorized frameworks. |
| if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) { |
| continue; |
| } |
| |
| writer->element(FullFrameworkWriter(approvers, framework)); |
| } |
| }); |
| |
| // Model all of the completed frameworks. |
| writer->field( |
| "completed_frameworks", |
| [master, &approvers](JSON::ArrayWriter* writer) { |
| foreachvalue ( |
| const Owned<Framework>& framework, |
| master->frameworks.completed) { |
| // Skip unauthorized frameworks. |
| if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) { |
| continue; |
| } |
| |
| writer->element( |
| FullFrameworkWriter(approvers, framework.get())); |
| } |
| }); |
| |
| // Orphan tasks are no longer possible. We emit an empty array |
| // for the sake of backward compatibility. |
| writer->field("orphan_tasks", [](JSON::ArrayWriter*) {}); |
| |
| // Unregistered frameworks are no longer possible. We emit an |
| // empty array for the sake of backward compatibility. |
| writer->field("unregistered_frameworks", [](JSON::ArrayWriter*) {}); |
| }; |
| |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| OK(jsonify(calculateState), query.get("jsonp")), |
| None()); |
| } |
| |
| |
| pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>> |
| Master::ReadOnlyHandler::stateSummary( |
| ContentType outputContentType, |
| const hashmap<std::string, std::string>& query, |
| const process::Owned<ObjectApprovers>& approvers) const |
| { |
| CHECK_EQ(outputContentType, ContentType::JSON); |
| |
| const Master* master = this->master; |
| auto stateSummary = [master, &approvers](JSON::ObjectWriter* writer) { |
| writer->field("hostname", master->info().hostname()); |
| |
| if (master->flags.cluster.isSome()) { |
| writer->field("cluster", master->flags.cluster.get()); |
| } |
| |
| // We use the tasks in the 'Frameworks' struct to compute summaries |
| // for this endpoint. This is done 1) for consistency between the |
| // 'slaves' and 'frameworks' subsections below 2) because we want to |
| // provide summary information for frameworks that are currently |
| // registered 3) the frameworks keep a circular buffer of completed |
| // tasks that we can use to keep a limited view on the history of |
| // recent completed / failed tasks. |
| |
| // Generate mappings from 'slave' to 'framework' and reverse. |
| SlaveFrameworkMapping slaveFrameworkMapping( |
| master->frameworks.registered); |
| |
| // Generate 'TaskState' summaries for all framework and slave ids. |
| TaskStateSummaries taskStateSummaries( |
| master->frameworks.registered); |
| |
| // Model all of the slaves. |
| writer->field( |
| "slaves", |
| [master, |
| &slaveFrameworkMapping, |
| &taskStateSummaries, |
| &approvers](JSON::ArrayWriter* writer) { |
| foreachvalue (Slave* slave, master->slaves.registered) { |
| writer->element( |
| [&slave, |
| &master, |
| &slaveFrameworkMapping, |
| &taskStateSummaries, |
| &approvers](JSON::ObjectWriter* writer) { |
| SlaveWriter slaveWriter( |
| *slave, |
| master->slaves.draining.get(slave->id), |
| master->slaves.deactivated.contains(slave->id), |
| approvers); |
| slaveWriter(writer); |
| |
| // Add the 'TaskState' summary for this slave. |
| const TaskStateSummary& summary = |
| taskStateSummaries.slave(slave->id); |
| |
| // Certain per-agent status totals will always be zero |
| // (e.g., TASK_ERROR, TASK_UNREACHABLE). We report |
| // them here anyway, for completeness. |
| // |
| // TODO(neilc): Update for TASK_GONE and |
| // TASK_GONE_BY_OPERATOR. |
| writer->field("TASK_STAGING", summary.staging); |
| writer->field("TASK_STARTING", summary.starting); |
| writer->field("TASK_RUNNING", summary.running); |
| writer->field("TASK_KILLING", summary.killing); |
| writer->field("TASK_FINISHED", summary.finished); |
| writer->field("TASK_KILLED", summary.killed); |
| writer->field("TASK_FAILED", summary.failed); |
| writer->field("TASK_LOST", summary.lost); |
| writer->field("TASK_ERROR", summary.error); |
| writer->field( |
| "TASK_UNREACHABLE", |
| summary.unreachable); |
| |
| // Add the ids of all the frameworks running on this |
| // slave. |
| const hashset<FrameworkID>& frameworks = |
| slaveFrameworkMapping.frameworks(slave->id); |
| |
| writer->field( |
| "framework_ids", |
| [&frameworks](JSON::ArrayWriter* writer) { |
| foreach ( |
| const FrameworkID& frameworkId, |
| frameworks) { |
| writer->element(frameworkId.value()); |
| } |
| }); |
| }); |
| } |
| }); |
| |
| // Model all of the frameworks. |
| writer->field( |
| "frameworks", |
| [master, |
| &slaveFrameworkMapping, |
| &taskStateSummaries, |
| &approvers](JSON::ArrayWriter* writer) { |
| foreachpair (const FrameworkID& frameworkId, |
| Framework* framework, |
| master->frameworks.registered) { |
| // Skip unauthorized frameworks. |
| if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) { |
| continue; |
| } |
| |
| writer->element( |
| [&frameworkId, |
| &framework, |
| &slaveFrameworkMapping, |
| &taskStateSummaries](JSON::ObjectWriter* writer) { |
| json(writer, Summary<Framework>(*framework)); |
| |
| // Add the 'TaskState' summary for this framework. |
| const TaskStateSummary& summary = |
| taskStateSummaries.framework(frameworkId); |
| |
| // TODO(neilc): Update for TASK_GONE and |
| // TASK_GONE_BY_OPERATOR. |
| writer->field("TASK_STAGING", summary.staging); |
| writer->field("TASK_STARTING", summary.starting); |
| writer->field("TASK_RUNNING", summary.running); |
| writer->field("TASK_KILLING", summary.killing); |
| writer->field("TASK_FINISHED", summary.finished); |
| writer->field("TASK_KILLED", summary.killed); |
| writer->field("TASK_FAILED", summary.failed); |
| writer->field("TASK_LOST", summary.lost); |
| writer->field("TASK_ERROR", summary.error); |
| writer->field( |
| "TASK_UNREACHABLE", |
| summary.unreachable); |
| |
| // Add the ids of all the slaves running |
| // this framework. |
| const hashset<SlaveID>& slaves = |
| slaveFrameworkMapping.slaves(frameworkId); |
| |
| writer->field( |
| "slave_ids", |
| [&slaves](JSON::ArrayWriter* writer) { |
| foreach (const SlaveID& slaveId, slaves) { |
| writer->element(slaveId.value()); |
| } |
| }); |
| }); |
| } |
| }); |
| }; |
| |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| OK(jsonify(stateSummary), query.get("jsonp")), |
| None()); |
| } |
| |
| |
| struct TaskComparator |
| { |
| static bool ascending(const Task* lhs, const Task* rhs) |
| { |
| size_t lhsSize = lhs->statuses().size(); |
| size_t rhsSize = rhs->statuses().size(); |
| |
| if ((lhsSize == 0) && (rhsSize == 0)) { |
| return false; |
| } |
| |
| if (lhsSize == 0) { |
| return true; |
| } |
| |
| if (rhsSize == 0) { |
| return false; |
| } |
| |
| return (lhs->statuses(0).timestamp() < rhs->statuses(0).timestamp()); |
| } |
| |
| static bool descending(const Task* lhs, const Task* rhs) |
| { |
| size_t lhsSize = lhs->statuses().size(); |
| size_t rhsSize = rhs->statuses().size(); |
| |
| if ((lhsSize == 0) && (rhsSize == 0)) { |
| return false; |
| } |
| |
| if (rhsSize == 0) { |
| return true; |
| } |
| |
| if (lhsSize == 0) { |
| return false; |
| } |
| |
| return (lhs->statuses(0).timestamp() > rhs->statuses(0).timestamp()); |
| } |
| }; |
| |
| |
| pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>> |
| Master::ReadOnlyHandler::tasks( |
| ContentType outputContentType, |
| const hashmap<std::string, std::string>& query, |
| const process::Owned<ObjectApprovers>& approvers) const |
| { |
| CHECK_EQ(outputContentType, ContentType::JSON); |
| |
| // Get list options (limit and offset). |
| Result<int> result = numify<int>(query.get("limit")); |
| size_t limit = result.isSome() ? result.get() : TASK_LIMIT; |
| |
| result = numify<int>(query.get("offset")); |
| size_t offset = result.isSome() ? result.get() : 0; |
| |
| Option<string> order = query.get("order"); |
| string _order = order.isSome() && (order.get() == "asc") ? "asc" : "des"; |
| |
| Option<string> frameworkId = query.get("framework_id"); |
| Option<string> taskId = query.get("task_id"); |
| |
| IDAcceptor<FrameworkID> selectFrameworkId(frameworkId); |
| IDAcceptor<TaskID> selectTaskId(taskId); |
| |
| // Construct framework list with both active and completed frameworks. |
| vector<const Framework*> frameworks; |
| foreachvalue (const Framework* framework, master->frameworks.registered) { |
| // Skip unauthorized frameworks or frameworks without matching |
| // framework ID. |
| if (!selectFrameworkId.accept(framework->id()) || |
| !approvers->approved<VIEW_FRAMEWORK>(framework->info)) { |
| continue; |
| } |
| |
| frameworks.push_back(framework); |
| } |
| |
| foreachvalue (const Owned<Framework>& framework, |
| master->frameworks.completed) { |
| // Skip unauthorized frameworks or frameworks without matching |
| // framework ID. |
| if (!selectFrameworkId.accept(framework->id()) || |
| !approvers->approved<VIEW_FRAMEWORK>(framework->info)) { |
| continue; |
| } |
| |
| frameworks.push_back(framework.get()); |
| } |
| |
| // Construct task list with both running, |
| // completed and unreachable tasks. |
| vector<const Task*> tasks; |
| foreach (const Framework* framework, frameworks) { |
| foreachvalue (Task* task, framework->tasks) { |
| // Skip unauthorized tasks or tasks without matching task ID. |
| if (!selectTaskId.accept(task->task_id()) || |
| !approvers->approved<VIEW_TASK>(*task, framework->info)) { |
| continue; |
| } |
| |
| tasks.push_back(task); |
| } |
| |
| foreachvalue ( |
| const Owned<Task>& task, |
| framework->unreachableTasks) { |
| // Skip unauthorized tasks or tasks without matching task ID. |
| if (!selectTaskId.accept(task->task_id()) || |
| !approvers->approved<VIEW_TASK>(*task, framework->info)) { |
| continue; |
| } |
| |
| tasks.push_back(task.get()); |
| } |
| |
| foreach (const Owned<Task>& task, framework->completedTasks) { |
| // Skip unauthorized tasks or tasks without matching task ID. |
| if (!selectTaskId.accept(task->task_id()) || |
| !approvers->approved<VIEW_TASK>(*task, framework->info)) { |
| continue; |
| } |
| |
| tasks.push_back(task.get()); |
| } |
| } |
| |
| // Sort tasks by task status timestamp. Default order is descending. |
| // The earliest timestamp is chosen for comparison when |
| // multiple are present. |
| if (_order == "asc") { |
| sort(tasks.begin(), tasks.end(), TaskComparator::ascending); |
| } else { |
| sort(tasks.begin(), tasks.end(), TaskComparator::descending); |
| } |
| |
| auto tasksWriter = |
| [&tasks, limit, offset](JSON::ObjectWriter* writer) { |
| writer->field( |
| "tasks", |
| [&tasks, limit, offset](JSON::ArrayWriter* writer) { |
| // Collect 'limit' number of tasks starting from 'offset'. |
| size_t end = std::min(offset + limit, tasks.size()); |
| for (size_t i = offset; i < end; i++) { |
| writer->element(*tasks[i]); |
| } |
| }); |
| }; |
| |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| OK(jsonify(tasksWriter), query.get("jsonp")), |
| None()); |
| } |
| |
| |
| function<void(JSON::ObjectWriter*)> Master::ReadOnlyHandler::jsonifyGetAgents( |
| const Owned<ObjectApprovers>& approvers) const |
| { |
| // Serialize the following: |
| // |
| // mesos::master::Response::GetAgents getAgents; |
| // for each registered agent: |
| // *getAgents.add_agents() = protobuf::master::event::createAgentResponse( |
| // agent, |
| // master->slaves.draining.get(slave->id), |
| // master->slaves.deactivated.contains(slave->id), |
| // approvers); |
| // for each recovered agent: |
| // SlaveInfo* agent = getAgents.add_recovered_agents(); |
| // agent->CopyFrom(slaveInfo); |
| // agent->clear_resources(); |
| // foreach (const Resource& resource, slaveInfo.resources()): |
| // if (approvers->approved<VIEW_ROLE>(resource)): |
| // *agent->add_resources() = resource; |
| |
| // TODO(bmahler): This copies the Owned object approvers. |
| return [=](JSON::ObjectWriter* writer) { |
| const google::protobuf::Descriptor* descriptor = |
| v1::master::Response::GetAgents::descriptor(); |
| |
| int field; |
| |
| field = v1::master::Response::GetAgents::kAgentsFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| [&](JSON::ArrayWriter* writer) { |
| foreachvalue (const Slave* slave, master->slaves.registered) { |
| // TODO(bmahler): Consider not constructing the temporary |
| // agent object and instead serialize directly. |
| mesos::master::Response::GetAgents::Agent agent = |
| protobuf::master::event::createAgentResponse( |
| *slave, |
| master->slaves.draining.get(slave->id), |
| master->slaves.deactivated.contains(slave->id), |
| approvers); |
| |
| writer->element(asV1Protobuf(agent)); |
| } |
| }); |
| |
| field = v1::master::Response::GetAgents::kRecoveredAgentsFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| [&](JSON::ArrayWriter* writer) { |
| foreachvalue (const SlaveInfo& slaveInfo, master->slaves.recovered) { |
| // TODO(bmahler): Consider not constructing the temporary |
| // SlaveInfo object and instead serialize directly. |
| SlaveInfo agent = slaveInfo; |
| agent.clear_resources(); |
| foreach (const Resource& resource, slaveInfo.resources()) { |
| if (approvers->approved<VIEW_ROLE>(resource)) { |
| *agent.add_resources() = resource; |
| } |
| } |
| |
| writer->element(asV1Protobuf(agent)); |
| } |
| }); |
| }; |
| } |
| |
| |
| string Master::ReadOnlyHandler::serializeGetAgents( |
| const Owned<ObjectApprovers>& approvers) const |
| { |
| // Serialize the following: |
| // |
| // mesos::master::Response::GetAgents getAgents; |
| // for each registered agent: |
| // *getAgents.add_agents() = protobuf::master::event::createAgentResponse( |
| // agent, |
| // master->slaves.draining.get(slave->id), |
| // master->slaves.deactivated.contains(slave->id), |
| // approvers); |
| // for each recovered agent: |
| // SlaveInfo* agent = getAgents.add_recovered_agents(); |
| // agent->CopyFrom(slaveInfo); |
| // agent->clear_resources(); |
| // foreach (const Resource& resource, slaveInfo.resources()): |
| // if (approvers->approved<VIEW_ROLE>(resource)): |
| // *agent->add_resources() = resource; |
| |
| string output; |
| google::protobuf::io::StringOutputStream stream(&output); |
| google::protobuf::io::CodedOutputStream writer(&stream); |
| |
| foreachvalue (const Slave* slave, master->slaves.registered) { |
| // TODO(bmahler): Consider not constructing the temporary |
| // agent object and instead serialize directly. |
| WireFormatLite2::WriteMessageWithoutCachedSizes( |
| mesos::master::Response::GetAgents::kAgentsFieldNumber, |
| protobuf::master::event::createAgentResponse( |
| *slave, |
| master->slaves.draining.get(slave->id), |
| master->slaves.deactivated.contains(slave->id), |
| approvers), |
| &writer); |
| } |
| |
| foreachvalue (const SlaveInfo& slaveInfo, master->slaves.recovered) { |
| // TODO(bmahler): Consider not constructing the temporary |
| // SlaveInfo object and instead serialize directly. |
| SlaveInfo agent = slaveInfo; |
| agent.clear_resources(); |
| foreach (const Resource& resource, slaveInfo.resources()) { |
| if (approvers->approved<VIEW_ROLE>(resource)) { |
| *agent.add_resources() = resource; |
| } |
| } |
| |
| WireFormatLite2::WriteMessageWithoutCachedSizes( |
| mesos::master::Response::GetAgents::kRecoveredAgentsFieldNumber, |
| agent, |
| &writer); |
| } |
| |
| // While an explicit Trim() isn't necessary (since the coded |
| // output stream is destructed before the string is returned), |
| // it's a quite tricky bug to diagnose if Trim() is missed, so |
| // we always do it explicitly to signal the reader about this |
| // subtlety. |
| writer.Trim(); |
| |
| return output; |
| } |
| |
| |
| |
| pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>> |
| Master::ReadOnlyHandler::getAgents( |
| ContentType outputContentType, |
| const hashmap<std::string, std::string>& query, |
| const process::Owned<ObjectApprovers>& approvers) const |
| { |
| // Serialize the following message: |
| // |
| // mesos::master::Response response; |
| // response.set_type(mesos::master::Response::GET_AGENTS); |
| // *response.mutable_get_agents() = _getAgents(approvers); |
| |
| switch (outputContentType) { |
| case ContentType::PROTOBUF: { |
| string output; |
| google::protobuf::io::StringOutputStream stream(&output); |
| google::protobuf::io::CodedOutputStream writer(&stream); |
| |
| WireFormatLite::WriteEnum( |
| mesos::v1::master::Response::kTypeFieldNumber, |
| mesos::v1::master::Response::GET_AGENTS, |
| &writer); |
| |
| WireFormatLite::WriteBytes( |
| mesos::v1::master::Response::kGetAgentsFieldNumber, |
| serializeGetAgents(approvers), |
| &writer); |
| |
| // We must manually trim the unused buffer space since |
| // we use the string before the coded output stream is |
| // destructed. |
| writer.Trim(); |
| |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| OK(std::move(output), stringify(outputContentType)), |
| None()); |
| } |
| |
| case ContentType::JSON: { |
| string body = jsonify([&](JSON::ObjectWriter* writer) { |
| const google::protobuf::Descriptor* descriptor = |
| v1::master::Response::descriptor(); |
| |
| int field; |
| |
| field = v1::master::Response::kTypeFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| v1::master::Response::Type_Name( |
| v1::master::Response::GET_AGENTS)); |
| |
| field = v1::master::Response::kGetAgentsFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| jsonifyGetAgents(approvers)); |
| }); |
| |
| // TODO(bmahler): Pass jsonp query parameter through here. |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| OK(std::move(body), stringify(outputContentType)), |
| None()); |
| } |
| |
| default: |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| NotAcceptable("Request must accept json or protobuf"), None()); |
| } |
| } |
| |
| |
| function<void(JSON::ObjectWriter*)> |
| Master::ReadOnlyHandler::jsonifyGetFrameworks( |
| const Owned<ObjectApprovers>& approvers) const |
| { |
| // Serialize the following: |
| // |
| // mesos::master::Response::GetFrameworks getFrameworks; |
| // for each framework: |
| // *getFrameworks.add_frameworks() = model(*framework); |
| // for each completed framework: |
| // *getFrameworks.add_completed_frameworks() = model(*framework); |
| |
| // TODO(bmahler): Consider not constructing the temporary framework |
| // objects and instead serialize directly, but since we don't |
| // expect a large number of pending tasks, we currently don't |
| // bother with the more efficient approach. |
| |
| // TODO(bmahler): This copies the Owned object approvers. |
| return [=](JSON::ObjectWriter* writer) { |
| const google::protobuf::Descriptor* descriptor = |
| v1::master::Response::GetFrameworks::descriptor(); |
| |
| int field; |
| |
| field = v1::master::Response::GetFrameworks::kFrameworksFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| [&](JSON::ArrayWriter* writer) { |
| foreachvalue (const Framework* framework, |
| master->frameworks.registered) { |
| // Skip unauthorized frameworks. |
| if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) { |
| continue; |
| } |
| |
| mesos::master::Response::GetFrameworks::Framework f = model(*framework); |
| writer->element(asV1Protobuf(f)); |
| } |
| }); |
| |
| field = |
| v1::master::Response::GetFrameworks::kCompletedFrameworksFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| [&](JSON::ArrayWriter* writer) { |
| foreachvalue (const Owned<Framework>& framework, |
| master->frameworks.completed) { |
| // Skip unauthorized frameworks. |
| if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) { |
| continue; |
| } |
| |
| mesos::master::Response::GetFrameworks::Framework f = model(*framework); |
| writer->element(asV1Protobuf(f)); |
| } |
| }); |
| }; |
| } |
| |
| |
| string Master::ReadOnlyHandler::serializeGetFrameworks( |
| const Owned<ObjectApprovers>& approvers) const |
| { |
| // Serialize the following: |
| // |
| // mesos::master::Response::GetFrameworks getFrameworks; |
| // for each framework: |
| // *getFrameworks.add_frameworks() = model(*framework); |
| // for each completed framework: |
| // *getFrameworks.add_completed_frameworks() = model(*framework); |
| |
| string output; |
| google::protobuf::io::StringOutputStream stream(&output); |
| google::protobuf::io::CodedOutputStream writer(&stream); |
| |
| // TODO(bmahler): Consider not constructing the temporary framework |
| // objects and instead serialize directly, but since we don't |
| // expect a large number of pending tasks, we currently don't |
| // bother with the more efficient approach. |
| |
| foreachvalue (const Framework* framework, |
| master->frameworks.registered) { |
| // Skip unauthorized frameworks. |
| if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) { |
| continue; |
| } |
| |
| WireFormatLite2::WriteMessageWithoutCachedSizes( |
| mesos::master::Response::GetFrameworks::kFrameworksFieldNumber, |
| model(*framework), |
| &writer); |
| } |
| |
| foreachvalue (const Owned<Framework>& framework, |
| master->frameworks.completed) { |
| // Skip unauthorized frameworks. |
| if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) { |
| continue; |
| } |
| |
| WireFormatLite2::WriteMessageWithoutCachedSizes( |
| mesos::master::Response::GetFrameworks::kCompletedFrameworksFieldNumber, |
| model(*framework), |
| &writer); |
| } |
| |
| // While an explicit Trim() isn't necessary (since the coded |
| // output stream is destructed before the string is returned), |
| // it's a quite tricky bug to diagnose if Trim() is missed, so |
| // we always do it explicitly to signal the reader about this |
| // subtlety. |
| writer.Trim(); |
| |
| return output; |
| } |
| |
| |
| pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>> |
| Master::ReadOnlyHandler::getFrameworks( |
| ContentType outputContentType, |
| const hashmap<std::string, std::string>& query, |
| const process::Owned<ObjectApprovers>& approvers) const |
| { |
| // Serialize the following message: |
| // |
| // mesos::master::Response response; |
| // response.set_type(mesos::master::Response::GET_FRAMEWORKS); |
| // *response.mutable_get_frameworks() = _getFrameworks(approvers); |
| |
| switch (outputContentType) { |
| case ContentType::PROTOBUF: { |
| string output; |
| google::protobuf::io::StringOutputStream stream(&output); |
| google::protobuf::io::CodedOutputStream writer(&stream); |
| |
| WireFormatLite::WriteEnum( |
| mesos::v1::master::Response::kTypeFieldNumber, |
| mesos::v1::master::Response::GET_FRAMEWORKS, |
| &writer); |
| |
| WireFormatLite::WriteBytes( |
| mesos::v1::master::Response::kGetFrameworksFieldNumber, |
| serializeGetFrameworks(approvers), |
| &writer); |
| |
| // We must manually trim the unused buffer space since |
| // we use the string before the coded output stream is |
| // destructed. |
| writer.Trim(); |
| |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| OK(std::move(output), stringify(outputContentType)), |
| None()); |
| } |
| |
| case ContentType::JSON: { |
| string body = jsonify([&](JSON::ObjectWriter* writer) { |
| const google::protobuf::Descriptor* descriptor = |
| v1::master::Response::descriptor(); |
| |
| int field; |
| |
| field = v1::master::Response::kTypeFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| v1::master::Response::Type_Name( |
| v1::master::Response::GET_FRAMEWORKS)); |
| |
| field = v1::master::Response::kGetFrameworksFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| jsonifyGetFrameworks(approvers)); |
| }); |
| |
| // TODO(bmahler): Pass jsonp query parameter through here. |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| OK(std::move(body), stringify(outputContentType)), |
| None()); |
| } |
| |
| default: |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| NotAcceptable("Request must accept json or protobuf"), None()); |
| } |
| } |
| |
| |
| function<void(JSON::ObjectWriter*)> |
| Master::ReadOnlyHandler::jsonifyGetExecutors( |
| const Owned<ObjectApprovers>& approvers) const |
| { |
| // Serialize the following: |
| // |
| // mesos::master::Response::GetExecutors getExecutors; |
| // |
| // for each (executor, agent): |
| // mesos::master::Response::GetExecutors::Executor* executor = |
| // getExecutors.add_executors(); |
| // *executor->mutable_executor_info() = executorInfo; |
| // *executor->mutable_slave_id() = slaveId; |
| |
| // TODO(bmahler): This copies the owned object approvers. |
| return [=](JSON::ObjectWriter* writer) { |
| // Construct framework list with both active and completed frameworks. |
| vector<const Framework*> frameworks; |
| foreachvalue (const Framework* framework, master->frameworks.registered) { |
| // Skip unauthorized frameworks. |
| if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) { |
| frameworks.push_back(framework); |
| } |
| } |
| foreachvalue (const Owned<Framework>& framework, |
| master->frameworks.completed) { |
| // Skip unauthorized frameworks. |
| if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) { |
| frameworks.push_back(framework.get()); |
| } |
| } |
| |
| const google::protobuf::Descriptor* descriptor = |
| v1::master::Response::GetExecutors::descriptor(); |
| |
| int field; |
| |
| field = v1::master::Response::GetExecutors::kExecutorsFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| [&](JSON::ArrayWriter* writer) { |
| foreach (const Framework* framework, frameworks) { |
| foreachpair (const SlaveID& slaveId, |
| const auto& executorsMap, |
| framework->executors) { |
| foreachvalue (const ExecutorInfo& executorInfo, executorsMap) { |
| // Skip unauthorized executors. |
| if (!approvers->approved<VIEW_EXECUTOR>( |
| executorInfo, framework->info)) { |
| continue; |
| } |
| |
| writer->element([&](JSON::ObjectWriter* writer) { |
| const google::protobuf::Descriptor* descriptor = |
| v1::master::Response::GetExecutors::Executor::descriptor(); |
| |
| // Serialize the following message: |
| // |
| // mesos::master::Response::GetExecutors::Executor executor; |
| // *executor.mutable_executor_info() = executorInfo; |
| // *executor.mutable_slave_id() = slaveId; |
| int field; |
| |
| field = v1::master::Response::GetExecutors::Executor |
| ::kExecutorInfoFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| asV1Protobuf(executorInfo)); |
| |
| field = v1::master::Response::GetExecutors::Executor |
| ::kAgentIdFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| asV1Protobuf(slaveId)); |
| }); |
| } |
| } |
| } |
| }); |
| }; |
| } |
| |
| |
| string Master::ReadOnlyHandler::serializeGetExecutors( |
| const Owned<ObjectApprovers>& approvers) const |
| { |
| // Construct framework list with both active and completed frameworks. |
| vector<const Framework*> frameworks; |
| foreachvalue (const Framework* framework, master->frameworks.registered) { |
| // Skip unauthorized frameworks. |
| if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) { |
| frameworks.push_back(framework); |
| } |
| } |
| foreachvalue (const Owned<Framework>& framework, |
| master->frameworks.completed) { |
| // Skip unauthorized frameworks. |
| if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) { |
| frameworks.push_back(framework.get()); |
| } |
| } |
| |
| // Lambda for serializing the following message: |
| // |
| // mesos::master::Response::GetExecutors::Executor executor; |
| // *executor.mutable_executor_info() = executorInfo; |
| // *executor.mutable_slave_id() = slaveId; |
| auto serializeExecutor = [](const ExecutorInfo& e, const SlaveID& s) { |
| string output; |
| google::protobuf::io::StringOutputStream stream(&output); |
| google::protobuf::io::CodedOutputStream writer(&stream); |
| |
| WireFormatLite2::WriteMessageWithoutCachedSizes( |
| mesos::v1::master::Response::GetExecutors::Executor |
| ::kExecutorInfoFieldNumber, |
| e, |
| &writer); |
| |
| WireFormatLite2::WriteMessageWithoutCachedSizes( |
| mesos::v1::master::Response::GetExecutors::Executor |
| ::kAgentIdFieldNumber, |
| s, |
| &writer); |
| |
| // While an explicit Trim() isn't necessary (since the coded |
| // output stream is destructed before the string is returned), |
| // it's a quite tricky bug to diagnose if Trim() is missed, so |
| // we always do it explicitly to signal the reader about this |
| // subtlety. |
| writer.Trim(); |
| |
| return output; |
| }; |
| |
| string output; |
| google::protobuf::io::StringOutputStream stream(&output); |
| google::protobuf::io::CodedOutputStream writer(&stream); |
| |
| // Serialize the following: |
| // |
| // mesos::master::Response::GetExecutors getExecutors; |
| // |
| // for each (executor, agent): |
| // mesos::master::Response::GetExecutors::Executor* executor = |
| // getExecutors.add_executors(); |
| // *executor->mutable_executor_info() = executorInfo; |
| // *executor->mutable_slave_id() = slaveId; |
| |
| foreach (const Framework* framework, frameworks) { |
| foreachpair (const SlaveID& slaveId, |
| const auto& executorsMap, |
| framework->executors) { |
| foreachvalue (const ExecutorInfo& executorInfo, executorsMap) { |
| // Skip unauthorized executors. |
| if (!approvers->approved<VIEW_EXECUTOR>( |
| executorInfo, framework->info)) { |
| continue; |
| } |
| |
| WireFormatLite::WriteBytes( |
| mesos::v1::master::Response::GetExecutors::kExecutorsFieldNumber, |
| serializeExecutor(executorInfo, slaveId), |
| &writer); |
| } |
| } |
| } |
| |
| // While an explicit Trim() isn't necessary (since the coded |
| // output stream is destructed before the string is returned), |
| // it's a quite tricky bug to diagnose if Trim() is missed, so |
| // we always do it explicitly to signal the reader about this |
| // subtlety. |
| writer.Trim(); |
| |
| return output; |
| } |
| |
| |
| pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>> |
| Master::ReadOnlyHandler::getExecutors( |
| ContentType outputContentType, |
| const hashmap<std::string, std::string>& query, |
| const process::Owned<ObjectApprovers>& approvers) const |
| { |
| // Serialize the following message: |
| // |
| // mesos::master::Response response; |
| // response.set_type(mesos::master::Response::GET_EXECUTORS); |
| // *response.mutable_get_executors() = _getExecutors(approvers); |
| |
| switch (outputContentType) { |
| case ContentType::PROTOBUF: { |
| string output; |
| google::protobuf::io::StringOutputStream stream(&output); |
| google::protobuf::io::CodedOutputStream writer(&stream); |
| |
| WireFormatLite::WriteEnum( |
| mesos::v1::master::Response::kTypeFieldNumber, |
| mesos::v1::master::Response::GET_EXECUTORS, |
| &writer); |
| |
| WireFormatLite::WriteBytes( |
| mesos::v1::master::Response::kGetExecutorsFieldNumber, |
| serializeGetExecutors(approvers), |
| &writer); |
| |
| // We must manually trim the unused buffer space since |
| // we use the string before the coded output stream is |
| // destructed. |
| writer.Trim(); |
| |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| OK(std::move(output), stringify(outputContentType)), |
| None()); |
| } |
| |
| case ContentType::JSON: { |
| string body = jsonify([&](JSON::ObjectWriter* writer) { |
| const google::protobuf::Descriptor* descriptor = |
| v1::master::Response::descriptor(); |
| |
| int field; |
| |
| field = v1::master::Response::kTypeFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| v1::master::Response::Type_Name( |
| v1::master::Response::GET_EXECUTORS)); |
| |
| field = v1::master::Response::kGetExecutorsFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| jsonifyGetExecutors(approvers)); |
| }); |
| |
| // TODO(bmahler): Pass jsonp query parameter through here. |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| OK(std::move(body), stringify(outputContentType)), |
| None()); |
| } |
| |
| default: |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| NotAcceptable("Request must accept json or protobuf"), None()); |
| } |
| } |
| |
| |
| function<void(JSON::ObjectWriter*)> Master::ReadOnlyHandler::jsonifyGetTasks( |
| const Owned<ObjectApprovers>& approvers) const |
| { |
| // Jsonify the following message: |
| // |
| // master::Response::GetTasks getTasks; |
| // for each pending task: |
| // *getTasks.add_pending_tasks() = |
| // protobuf::createTask(taskInfo, TASK_STAGING, framework->id()); |
| // for each task: |
| // *getTasks.add_tasks() = *task; |
| // for each unreachable task: |
| // *getTasks.add_unreachable_tasks() = *task; |
| // for each completed task: |
| // *getTasks.add_completed_tasks() = *task; |
| |
| // TODO(bmahler): This copies the Owned object approvers. |
| return [=](JSON::ObjectWriter* writer) { |
| // Construct framework list with both active and completed frameworks. |
| vector<const Framework*> frameworks; |
| foreachvalue (const Framework* framework, master->frameworks.registered) { |
| // Skip unauthorized frameworks. |
| if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) { |
| frameworks.push_back(framework); |
| } |
| } |
| foreachvalue (const Owned<Framework>& framework, |
| master->frameworks.completed) { |
| // Skip unauthorized frameworks. |
| if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) { |
| frameworks.push_back(framework.get()); |
| } |
| } |
| |
| const google::protobuf::Descriptor* descriptor = |
| v1::master::Response::GetTasks::descriptor(); |
| |
| int field; |
| |
| // Active tasks. |
| field = v1::master::Response::GetTasks::kTasksFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| [&](JSON::ArrayWriter* writer) { |
| foreach (const Framework* framework, frameworks) { |
| foreachvalue (const Task* task, framework->tasks) { |
| // Skip unauthorized tasks. |
| if (!approvers->approved<VIEW_TASK>(*task, framework->info)) { |
| continue; |
| } |
| |
| writer->element(asV1Protobuf(*task)); |
| } |
| } |
| }); |
| |
| // Unreachable tasks. |
| field = v1::master::Response::GetTasks::kUnreachableTasksFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| [&](JSON::ArrayWriter* writer) { |
| foreach (const Framework* framework, frameworks) { |
| foreachvalue (const Owned<Task>& task, |
| framework->unreachableTasks) { |
| // Skip unauthorized tasks. |
| if (!approvers->approved<VIEW_TASK>(*task, framework->info)) { |
| continue; |
| } |
| |
| writer->element(asV1Protobuf(*task)); |
| } |
| } |
| }); |
| |
| // Completed tasks. |
| field = v1::master::Response::GetTasks::kCompletedTasksFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| [&](JSON::ArrayWriter* writer) { |
| foreach (const Framework* framework, frameworks) { |
| foreach (const Owned<Task>& task, framework->completedTasks) { |
| // Skip unauthorized tasks. |
| if (!approvers->approved<VIEW_TASK>(*task, framework->info)) { |
| continue; |
| } |
| |
| writer->element(asV1Protobuf(*task)); |
| } |
| } |
| }); |
| }; |
| } |
| |
| |
| string Master::ReadOnlyHandler::serializeGetTasks( |
| const Owned<ObjectApprovers>& approvers) const |
| { |
| // Construct framework list with both active and completed frameworks. |
| vector<const Framework*> frameworks; |
| foreachvalue (const Framework* framework, master->frameworks.registered) { |
| // Skip unauthorized frameworks. |
| if (approvers->approved<VIEW_FRAMEWORK>(framework->info)) { |
| frameworks.push_back(framework); |
| } |
| } |
| foreachvalue (const Owned<Framework>& framework, |
| master->frameworks.completed) { |
| // Skip unauthorized frameworks. |
| if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) { |
| frameworks.push_back(framework.get()); |
| } |
| } |
| |
| // Serialize the following message: |
| // |
| // mesos::master::Response::GetTasks getTasks; |
| // for each pending task: |
| // *getTasks.add_pending_tasks() = |
| // protobuf::createTask(taskInfo, TASK_STAGING, framework->id()); |
| // for each task: |
| // *getTasks.add_tasks() = *task; |
| // for each unreachable task: |
| // *getTasks.add_unreachable_tasks() = *task; |
| // for each completed task: |
| // *getTasks.add_completed_tasks() = *task; |
| |
| string output; |
| google::protobuf::io::StringOutputStream stream(&output); |
| google::protobuf::io::CodedOutputStream writer(&stream); |
| |
| foreach (const Framework* framework, frameworks) { |
| // Active tasks. |
| foreachvalue (const Task* task, framework->tasks) { |
| // Skip unauthorized tasks. |
| if (!approvers->approved<VIEW_TASK>(*task, framework->info)) { |
| continue; |
| } |
| |
| WireFormatLite2::WriteMessageWithoutCachedSizes( |
| mesos::v1::master::Response::GetTasks::kTasksFieldNumber, |
| *task, |
| &writer); |
| } |
| |
| // Unreachable tasks. |
| foreachvalue (const Owned<Task>& task, framework->unreachableTasks) { |
| // Skip unauthorized tasks. |
| if (!approvers->approved<VIEW_TASK>(*task, framework->info)) { |
| continue; |
| } |
| |
| WireFormatLite2::WriteMessageWithoutCachedSizes( |
| mesos::v1::master::Response::GetTasks::kUnreachableTasksFieldNumber, |
| *task, |
| &writer); |
| } |
| |
| // Completed tasks. |
| foreach (const Owned<Task>& task, framework->completedTasks) { |
| // Skip unauthorized tasks. |
| if (!approvers->approved<VIEW_TASK>(*task, framework->info)) { |
| continue; |
| } |
| |
| WireFormatLite2::WriteMessageWithoutCachedSizes( |
| mesos::v1::master::Response::GetTasks::kCompletedTasksFieldNumber, |
| *task, |
| &writer); |
| } |
| } |
| |
| // While an explicit Trim() isn't necessary (since the coded |
| // output stream is destructed before the string is returned), |
| // it's a quite tricky bug to diagnose if Trim() is missed, so |
| // we always do it explicitly to signal the reader about this |
| // subtlety. |
| writer.Trim(); |
| |
| return output; |
| } |
| |
| |
| pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>> |
| Master::ReadOnlyHandler::getTasks( |
| ContentType outputContentType, |
| const hashmap<std::string, std::string>& query, |
| const process::Owned<ObjectApprovers>& approvers) const |
| { |
| // Serialize the following message: |
| // |
| // mesos::master::Response response; |
| // response.set_type(mesos::master::Response::GET_TASKS); |
| // *response.mutable_get_tasks() = _getTasks(approvers); |
| |
| switch (outputContentType) { |
| case ContentType::PROTOBUF: { |
| string output; |
| google::protobuf::io::StringOutputStream stream(&output); |
| google::protobuf::io::CodedOutputStream writer(&stream); |
| |
| WireFormatLite::WriteEnum( |
| mesos::v1::master::Response::kTypeFieldNumber, |
| mesos::v1::master::Response::GET_TASKS, |
| &writer); |
| |
| WireFormatLite::WriteBytes( |
| mesos::v1::master::Response::kGetTasksFieldNumber, |
| serializeGetTasks(approvers), |
| &writer); |
| |
| // We must manually trim the unused buffer space since |
| // we use the string before the coded output stream is |
| // destructed. |
| writer.Trim(); |
| |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| OK(std::move(output), stringify(outputContentType)), |
| None()); |
| } |
| |
| case ContentType::JSON: { |
| string body = jsonify([&](JSON::ObjectWriter* writer) { |
| const google::protobuf::Descriptor* descriptor = |
| v1::master::Response::descriptor(); |
| |
| int field; |
| |
| field = v1::master::Response::kTypeFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| v1::master::Response::Type_Name( |
| v1::master::Response::GET_TASKS)); |
| |
| field = v1::master::Response::kGetTasksFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| jsonifyGetTasks(approvers)); |
| }); |
| |
| // TODO(bmahler): Pass jsonp query parameter through here. |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| OK(std::move(body), stringify(outputContentType)), |
| None()); |
| } |
| |
| default: |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| NotAcceptable("Request must accept json or protobuf"), None()); |
| } |
| } |
| |
| |
| pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>> |
| Master::ReadOnlyHandler::getOperations( |
| ContentType outputContentType, |
| const hashmap<std::string, std::string>& query, |
| const process::Owned<ObjectApprovers>& approvers) const |
| { |
| // We consider a principal to be authorized to view an operation if it |
| // is authorized to view the resources the operation is performed on. |
| auto approved = [&approvers](const Operation& operation) { |
| Try<Resources> consumedResources = |
| protobuf::getConsumedResources(operation.info()); |
| |
| if (consumedResources.isError()) { |
| LOG(WARNING) |
| << "Could not approve operation " << operation.uuid() |
| << " since its consumed resources could not be determined:" |
| << consumedResources.error(); |
| |
| return false; |
| } |
| |
| foreach (const Resource& resource, consumedResources.get()) { |
| if (!approvers->approved<VIEW_ROLE>(resource)) { |
| return false; |
| } |
| } |
| |
| return true; |
| }; |
| |
| mesos::master::Response response; |
| response.set_type(mesos::master::Response::GET_OPERATIONS); |
| |
| mesos::master::Response::GetOperations* operations = |
| response.mutable_get_operations(); |
| |
| foreachvalue (const Slave* slave, master->slaves.registered) { |
| foreachvalue (const Operation* operation, slave->operations) { |
| if (approved(*operation)) { |
| operations->add_operations()->CopyFrom(*operation); |
| } |
| } |
| |
| foreachvalue ( |
| const Slave::ResourceProvider& resourceProvider, |
| slave->resourceProviders) { |
| foreachvalue (const Operation* operation, resourceProvider.operations) { |
| if (approved(*operation)) { |
| operations->add_operations()->CopyFrom(*operation); |
| } |
| } |
| } |
| } |
| |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| OK(serialize(outputContentType, evolve(response)), |
| stringify(outputContentType)), |
| None()); |
| } |
| |
| |
| pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>> |
| Master::ReadOnlyHandler::getRoles( |
| ContentType outputContentType, |
| const hashmap<std::string, std::string>& query, |
| const process::Owned<ObjectApprovers>& approvers) const |
| { |
| const vector<string> knownRoles = master->knownRoles(); |
| |
| mesos::master::Response response; |
| response.set_type(mesos::master::Response::GET_ROLES); |
| |
| mesos::master::Response::GetRoles* getRoles = |
| response.mutable_get_roles(); |
| |
| foreach (const string& name, knownRoles) { |
| if (!approvers->approved<VIEW_ROLE>(name)) { |
| continue; |
| } |
| |
| mesos::Role* role = getRoles->add_roles(); |
| |
| role->set_name(name); |
| |
| role->set_weight(master->weights.get(name).getOrElse(DEFAULT_WEIGHT)); |
| |
| RoleResourceBreakdown resourceBreakdown(master, name); |
| |
| ResourceQuantities allocatedAndOffered = |
| resourceBreakdown.allocated() + resourceBreakdown.offered(); |
| |
| // `resources` will be deprecated in favor of |
| // `offered`, `allocated`, `reserved`, and quota consumption. |
| // As a result, we don't bother trying to expose more |
| // than {cpus, mem, disk, gpus} since we don't know if |
| // anything outside this set is of type SCALAR. |
| foreach (const auto& quantity, allocatedAndOffered) { |
| if (quantity.first == "cpus" || quantity.first == "mem" || |
| quantity.first == "disk" || quantity.first == "gpus") { |
| Resource* resource = role->add_resources(); |
| resource->set_name(quantity.first); |
| resource->set_type(Value::SCALAR); |
| *resource->mutable_scalar() = quantity.second; |
| } |
| } |
| |
| Option<Role*> role_ = master->roles.get(name); |
| |
| if (role_.isSome()) { |
| foreachkey (const FrameworkID& frameworkId, (*role_)->frameworks) { |
| *role->add_frameworks() = frameworkId; |
| } |
| } |
| } |
| |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| OK(serialize(outputContentType, evolve(response)), |
| stringify(outputContentType)), |
| None()); |
| } |
| |
| |
| function<void(JSON::ObjectWriter*)> Master::ReadOnlyHandler::jsonifyGetState( |
| const Owned<ObjectApprovers>& approvers) const |
| { |
| // Jsonify the following message: |
| // |
| // mesos::master::Response::GetState getState; |
| // *getState.mutable_get_tasks() = _getTasks(approvers); |
| // *getState.mutable_get_executors() = _getExecutors(approvers); |
| // *getState.mutable_get_frameworks() = _getFrameworks(approvers); |
| // *getState.mutable_get_agents() = _getAgents(approvers); |
| |
| // TODO(bmahler): This copies the Owned object approvers. |
| return [=](JSON::ObjectWriter* writer) { |
| const google::protobuf::Descriptor* descriptor = |
| v1::master::Response::GetState::descriptor(); |
| |
| int field; |
| |
| field = v1::master::Response::GetState::kGetTasksFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| jsonifyGetTasks(approvers)); |
| |
| field = v1::master::Response::GetState::kGetExecutorsFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| jsonifyGetExecutors(approvers)); |
| |
| field = v1::master::Response::GetState::kGetFrameworksFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| jsonifyGetFrameworks(approvers)); |
| |
| field = v1::master::Response::GetState::kGetAgentsFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| jsonifyGetAgents(approvers)); |
| }; |
| } |
| |
| |
| string Master::ReadOnlyHandler::serializeGetState( |
| const Owned<ObjectApprovers>& approvers) const |
| { |
| // Serialize the following message: |
| // |
| // mesos::master::Response::GetState getState; |
| // *getState.mutable_get_tasks() = _getTasks(approvers); |
| // *getState.mutable_get_executors() = _getExecutors(approvers); |
| // *getState.mutable_get_frameworks() = _getFrameworks(approvers); |
| // *getState.mutable_get_agents() = _getAgents(approvers); |
| |
| string output; |
| google::protobuf::io::StringOutputStream stream(&output); |
| google::protobuf::io::CodedOutputStream writer(&stream); |
| |
| WireFormatLite::WriteBytes( |
| mesos::v1::master::Response::GetState::kGetTasksFieldNumber, |
| serializeGetTasks(approvers), |
| &writer); |
| |
| WireFormatLite::WriteBytes( |
| mesos::v1::master::Response::GetState::kGetExecutorsFieldNumber, |
| serializeGetExecutors(approvers), |
| &writer); |
| |
| WireFormatLite::WriteBytes( |
| mesos::v1::master::Response::GetState::kGetFrameworksFieldNumber, |
| serializeGetFrameworks(approvers), |
| &writer); |
| |
| WireFormatLite::WriteBytes( |
| mesos::v1::master::Response::GetState::kGetAgentsFieldNumber, |
| serializeGetAgents(approvers), |
| &writer); |
| |
| // While an explicit Trim() isn't necessary (since the coded |
| // output stream is destructed before the string is returned), |
| // it's a quite tricky bug to diagnose if Trim() is missed, so |
| // we always do it explicitly to signal the reader about this |
| // subtlety. |
| writer.Trim(); |
| |
| return output; |
| } |
| |
| |
| pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>> |
| Master::ReadOnlyHandler::getState( |
| ContentType outputContentType, |
| const hashmap<std::string, std::string>& query, |
| const process::Owned<ObjectApprovers>& approvers) const |
| { |
| // Serialize the following message: |
| // |
| // mesos::master::Response response; |
| // response.set_type(mesos::master::Response::GET_STATE); |
| // *response.mutable_get_state() = _getState(approvers); |
| |
| switch (outputContentType) { |
| case ContentType::PROTOBUF: { |
| string output; |
| google::protobuf::io::StringOutputStream stream(&output); |
| google::protobuf::io::CodedOutputStream writer(&stream); |
| |
| WireFormatLite::WriteEnum( |
| mesos::v1::master::Response::kTypeFieldNumber, |
| mesos::v1::master::Response::GET_STATE, |
| &writer); |
| |
| WireFormatLite::WriteBytes( |
| mesos::v1::master::Response::kGetStateFieldNumber, |
| serializeGetState(approvers), |
| &writer); |
| |
| // We must manually trim the unused buffer space since |
| // we use the string before the coded output stream is |
| // destructed. |
| writer.Trim(); |
| |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| OK(std::move(output), stringify(outputContentType)), |
| None()); |
| } |
| |
| case ContentType::JSON: { |
| string body = jsonify([&](JSON::ObjectWriter* writer) { |
| const google::protobuf::Descriptor* descriptor = |
| v1::master::Response::descriptor(); |
| |
| int field; |
| |
| field = v1::master::Response::kTypeFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| v1::master::Response::Type_Name( |
| v1::master::Response::GET_STATE)); |
| |
| field = v1::master::Response::kGetStateFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| jsonifyGetState(approvers)); |
| }); |
| |
| // TODO(bmahler): Pass jsonp query parameter through here. |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| OK(std::move(body), stringify(outputContentType)), |
| None()); |
| } |
| |
| default: |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| NotAcceptable("Request must accept json or protobuf"), None()); |
| } |
| } |
| |
| |
| pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>> |
| Master::ReadOnlyHandler::subscribe( |
| ContentType outputContentType, |
| const hashmap<std::string, std::string>& query, |
| const process::Owned<ObjectApprovers>& approvers) const |
| { |
| process::http::Pipe pipe; |
| OK ok; |
| |
| ok.headers["Content-Type"] = stringify(outputContentType); |
| ok.type = process::http::Response::PIPE; |
| ok.reader = pipe.reader(); |
| |
| StreamingHttpConnection<v1::master::Event> http( |
| pipe.writer(), outputContentType); |
| |
| // Serialize the following event: |
| // |
| // mesos::master::Event event; |
| // event.set_type(mesos::master::Event::SUBSCRIBED); |
| // *event.mutable_subscribed()->mutable_get_state() = |
| // _getState(approvers); |
| // event.mutable_subscribed()->set_heartbeat_interval_seconds( |
| // DEFAULT_HEARTBEAT_INTERVAL.secs()); |
| // |
| // http.send(event); |
| |
| switch (outputContentType) { |
| case ContentType::PROTOBUF: { |
| string serialized; |
| google::protobuf::io::StringOutputStream stream(&serialized); |
| google::protobuf::io::CodedOutputStream writer(&stream); |
| |
| WireFormatLite::WriteEnum( |
| mesos::v1::master::Event::kTypeFieldNumber, |
| mesos::v1::master::Event::SUBSCRIBED, |
| &writer); |
| |
| WireFormatLite::WriteBytes( |
| mesos::v1::master::Event::kSubscribedFieldNumber, |
| serializeSubscribe(approvers), |
| &writer); |
| |
| // We must manually trim the unused buffer space since |
| // we use the string before the coded output stream is |
| // destructed. |
| writer.Trim(); |
| |
| http.send(serialized); |
| |
| break; |
| } |
| |
| case ContentType::JSON: { |
| string serialized = jsonify([&](JSON::ObjectWriter* writer) { |
| const google::protobuf::Descriptor* descriptor = |
| v1::master::Event::descriptor(); |
| |
| int field; |
| |
| field = v1::master::Event::kTypeFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| v1::master::Event::Type_Name( |
| v1::master::Event::SUBSCRIBED)); |
| |
| field = v1::master::Event::kSubscribedFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| jsonifySubscribe(approvers)); |
| }); |
| |
| http.send(serialized); |
| |
| break; |
| } |
| |
| default: |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| NotAcceptable("Request must accept json or protobuf"), None()); |
| } |
| |
| mesos::master::Event heartbeatEvent; |
| heartbeatEvent.set_type(mesos::master::Event::HEARTBEAT); |
| http.send(heartbeatEvent); |
| |
| // This new subscriber needs to be added in the post-processing step. |
| Master::ReadOnlyHandler::PostProcessing::Subscribe s = {approvers, http}; |
| |
| Master::ReadOnlyHandler::PostProcessing postProcessing = { std::move(s) }; |
| |
| return pair<Response, Option<Master::ReadOnlyHandler::PostProcessing>>( |
| ok, |
| std::move(postProcessing)); |
| } |
| |
| |
| function<void(JSON::ObjectWriter*)> Master::ReadOnlyHandler::jsonifySubscribe( |
| const Owned<ObjectApprovers>& approvers) const |
| { |
| // Jsonify the following message: |
| // |
| // mesos::master::Event::Subscribed subscribed; |
| // *subscribed.mutable_get_state() = _getState(approvers); |
| // subscribed.set_heartbeat_interval_seconds( |
| // DEFAULT_HEARTBEAT_INTERVAL.secs()); |
| |
| // TODO(bmahler): This copies the Owned object approvers. |
| return [=](JSON::ObjectWriter* writer) { |
| const google::protobuf::Descriptor* descriptor = |
| v1::master::Event::Subscribed::descriptor(); |
| |
| int field; |
| |
| field = v1::master::Event::Subscribed::kGetStateFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| jsonifyGetState(approvers)); |
| |
| field = v1::master::Event::Subscribed::kHeartbeatIntervalSecondsFieldNumber; |
| writer->field( |
| descriptor->FindFieldByNumber(field)->name(), |
| DEFAULT_HEARTBEAT_INTERVAL.secs()); |
| }; |
| } |
| |
| |
| string Master::ReadOnlyHandler::serializeSubscribe( |
| const Owned<ObjectApprovers>& approvers) const |
| { |
| // Serialize the following message: |
| // |
| // mesos::master::Event::Subscribed subscribed; |
| // *subscribed.mutable_get_state() = _getState(approvers); |
| // subscribed.set_heartbeat_interval_seconds( |
| // DEFAULT_HEARTBEAT_INTERVAL.secs()); |
| |
| string output; |
| google::protobuf::io::StringOutputStream stream(&output); |
| google::protobuf::io::CodedOutputStream writer(&stream); |
| |
| WireFormatLite::WriteBytes( |
| mesos::v1::master::Event::Subscribed::kGetStateFieldNumber, |
| serializeGetState(approvers), |
| &writer); |
| |
| WireFormatLite::WriteDouble( |
| mesos::v1::master::Event::Subscribed |
| ::kHeartbeatIntervalSecondsFieldNumber, |
| DEFAULT_HEARTBEAT_INTERVAL.secs(), |
| &writer); |
| |
| // While an explicit Trim() isn't necessary (since the coded |
| // output stream is destructed before the string is returned), |
| // it's a quite tricky bug to diagnose if Trim() is missed, so |
| // we always do it explicitly to signal the reader about this |
| // subtlety. |
| writer.Trim(); |
| |
| return output; |
| } |
| |
| } // namespace master { |
| } // namespace internal { |
| } // namespace mesos { |