blob: 66d6160c5e951e6c3a51eb02c31ead066c322202 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#include "master/master.hpp"
#include <string>
#include <vector>
#include <mesos/mesos.hpp>
#include <mesos/authorizer/authorizer.hpp>
#include <process/http.hpp>
#include <process/owned.hpp>
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/jsonify.hpp>
#include <stout/option.hpp>
#include <stout/representation.hpp>
#include "common/build.hpp"
#include "common/http.hpp"
using process::Owned;
using process::http::OK;
using mesos::authorization::VIEW_EXECUTOR;
using mesos::authorization::VIEW_FLAGS;
using mesos::authorization::VIEW_FRAMEWORK;
using mesos::authorization::VIEW_ROLE;
using mesos::authorization::VIEW_TASK;
using std::vector;
using std::string;
namespace mesos {
namespace internal {
namespace master {
// Pull in model overrides from common.
using mesos::internal::model;
// The summary representation of `T` to support the `/state-summary` endpoint.
// e.g., `Summary<Slave>`.
template <typename T>
struct Summary : Representation<T>
{
using Representation<T>::Representation;
};
// The full representation of `T` to support the `/state` endpoint.
// e.g., `Full<Slave>`.
template <typename T>
struct Full : Representation<T>
{
using Representation<T>::Representation;
};
// Filtered representation of Full<Framework>.
// Executors and Tasks are filtered based on whether the
// user is authorized to view them.
//
// TODO(bevers): Consider moving writers and other json-related
// code into a separate file.
struct FullFrameworkWriter {
FullFrameworkWriter(
const process::Owned<ObjectApprovers>& approvers,
const Framework* framework);
void operator()(JSON::ObjectWriter* writer) const;
const process::Owned<ObjectApprovers>& approvers_;
const Framework* framework_;
};
struct SlaveWriter
{
SlaveWriter(
const Slave& slave,
const process::Owned<ObjectApprovers>& approvers);
void operator()(JSON::ObjectWriter* writer) const;
const Slave& slave_;
const process::Owned<ObjectApprovers>& approvers_;
};
struct SlavesWriter
{
SlavesWriter(
const Master::Slaves& slaves,
const process::Owned<ObjectApprovers>& approvers,
const IDAcceptor<SlaveID>& selectSlaveId);
void operator()(JSON::ObjectWriter* writer) const;
void writeSlave(const Slave* slave, JSON::ObjectWriter* writer) const;
const Master::Slaves& slaves_;
const process::Owned<ObjectApprovers>& approvers_;
const IDAcceptor<SlaveID>& selectSlaveId_;
};
void json(JSON::ObjectWriter* writer, const Summary<Framework>& summary);
FullFrameworkWriter::FullFrameworkWriter(
const Owned<ObjectApprovers>& approvers,
const Framework* framework)
: approvers_(approvers),
framework_(framework)
{}
void FullFrameworkWriter::operator()(JSON::ObjectWriter* writer) const
{
json(writer, Summary<Framework>(*framework_));
// Add additional fields to those generated by the
// `Summary<Framework>` overload.
writer->field("user", framework_->info.user());
writer->field("failover_timeout", framework_->info.failover_timeout());
writer->field("checkpoint", framework_->info.checkpoint());
writer->field("registered_time", framework_->registeredTime.secs());
writer->field("unregistered_time", framework_->unregisteredTime.secs());
if (framework_->info.has_principal()) {
writer->field("principal", framework_->info.principal());
}
// TODO(bmahler): Consider deprecating this in favor of the split
// used and offered resources added in `Summary<Framework>`.
writer->field(
"resources",
framework_->totalUsedResources + framework_->totalOfferedResources);
// TODO(benh): Consider making reregisteredTime an Option.
if (framework_->registeredTime != framework_->reregisteredTime) {
writer->field("reregistered_time", framework_->reregisteredTime.secs());
}
// For multi-role frameworks the `role` field will be unset.
// Note that we could set `roles` here for both cases, which
// would make tooling simpler (only need to look for `roles`).
// However, we opted to just mirror the protobuf akin to how
// generic protobuf -> JSON translation works.
if (framework_->capabilities.multiRole) {
writer->field("roles", framework_->info.roles());
} else {
writer->field("role", framework_->info.role());
}
// Model all of the tasks associated with a framework.
writer->field("tasks", [this](JSON::ArrayWriter* writer) {
foreachvalue (const TaskInfo& taskInfo, framework_->pendingTasks) {
// Skip unauthorized tasks.
if (!approvers_->approved<VIEW_TASK>(taskInfo, framework_->info)) {
continue;
}
writer->element([this, &taskInfo](JSON::ObjectWriter* writer) {
writer->field("id", taskInfo.task_id().value());
writer->field("name", taskInfo.name());
writer->field("framework_id", framework_->id().value());
writer->field(
"executor_id",
taskInfo.executor().executor_id().value());
writer->field("slave_id", taskInfo.slave_id().value());
writer->field("state", TaskState_Name(TASK_STAGING));
writer->field("resources", taskInfo.resources());
// Tasks are not allowed to mix resources allocated to
// different roles, see MESOS-6636.
writer->field(
"role",
taskInfo.resources().begin()->allocation_info().role());
writer->field("statuses", std::initializer_list<TaskStatus>{});
if (taskInfo.has_labels()) {
writer->field("labels", taskInfo.labels());
}
if (taskInfo.has_discovery()) {
writer->field("discovery", JSON::Protobuf(taskInfo.discovery()));
}
if (taskInfo.has_container()) {
writer->field("container", JSON::Protobuf(taskInfo.container()));
}
});
}
foreachvalue (Task* task, framework_->tasks) {
// Skip unauthorized tasks.
if (!approvers_->approved<VIEW_TASK>(*task, framework_->info)) {
continue;
}
writer->element(*task);
}
});
writer->field("unreachable_tasks", [this](JSON::ArrayWriter* writer) {
foreachvalue (const Owned<Task>& task, framework_->unreachableTasks) {
// Skip unauthorized tasks.
if (!approvers_->approved<VIEW_TASK>(*task, framework_->info)) {
continue;
}
writer->element(*task);
}
});
writer->field("completed_tasks", [this](JSON::ArrayWriter* writer) {
foreach (const Owned<Task>& task, framework_->completedTasks) {
// Skip unauthorized tasks.
if (!approvers_->approved<VIEW_TASK>(*task, framework_->info)) {
continue;
}
writer->element(*task);
}
});
// Model all of the offers associated with a framework.
writer->field("offers", [this](JSON::ArrayWriter* writer) {
foreach (Offer* offer, framework_->offers) {
writer->element(*offer);
}
});
// Model all of the executors of a framework.
writer->field("executors", [this](JSON::ArrayWriter* writer) {
foreachpair (
const SlaveID& slaveId,
const auto& executorsMap,
framework_->executors) {
foreachvalue (const ExecutorInfo& executor, executorsMap) {
writer->element([this,
&executor,
&slaveId](JSON::ObjectWriter* writer) {
// Skip unauthorized executors.
if (!approvers_->approved<VIEW_EXECUTOR>(
executor, framework_->info)) {
return;
}
json(writer, executor);
writer->field("slave_id", slaveId.value());
});
}
}
});
// Model all of the labels associated with a framework.
if (framework_->info.has_labels()) {
writer->field("labels", framework_->info.labels());
}
}
SlaveWriter::SlaveWriter(
const Slave& slave,
const Owned<ObjectApprovers>& approvers)
: slave_(slave), approvers_(approvers)
{}
void SlaveWriter::operator()(JSON::ObjectWriter* writer) const
{
json(writer, slave_.info);
writer->field("pid", string(slave_.pid));
writer->field("registered_time", slave_.registeredTime.secs());
if (slave_.reregisteredTime.isSome()) {
writer->field("reregistered_time", slave_.reregisteredTime->secs());
}
const Resources& totalResources = slave_.totalResources;
writer->field("resources", totalResources);
writer->field("used_resources", Resources::sum(slave_.usedResources));
writer->field("offered_resources", slave_.offeredResources);
writer->field(
"reserved_resources",
[&totalResources, this](JSON::ObjectWriter* writer) {
foreachpair (const string& role, const Resources& reservation,
totalResources.reservations()) {
// TODO(arojas): Consider showing unapproved resources in an
// aggregated special field, so that all resource values add up
// MESOS-7779.
if (approvers_->approved<VIEW_ROLE>(role)) {
writer->field(role, reservation);
}
}
});
writer->field("unreserved_resources", totalResources.unreserved());
writer->field("active", slave_.active);
writer->field("version", slave_.version);
writer->field("capabilities", slave_.capabilities.toRepeatedPtrField());
}
SlavesWriter::SlavesWriter(
const Master::Slaves& slaves,
const Owned<ObjectApprovers>& approvers,
const IDAcceptor<SlaveID>& selectSlaveId)
: slaves_(slaves), approvers_(approvers), selectSlaveId_(selectSlaveId)
{}
void SlavesWriter::operator()(JSON::ObjectWriter* writer) const
{
writer->field("slaves", [this](JSON::ArrayWriter* writer) {
foreachvalue (const Slave* slave, slaves_.registered) {
if (!selectSlaveId_.accept(slave->id)) {
continue;
}
writer->element([this, &slave](JSON::ObjectWriter* writer) {
writeSlave(slave, writer);
});
}
});
writer->field("recovered_slaves", [this](JSON::ArrayWriter* writer) {
foreachvalue (const SlaveInfo& slaveInfo, slaves_.recovered) {
if (!selectSlaveId_.accept(slaveInfo.id())) {
continue;
}
writer->element([&slaveInfo](JSON::ObjectWriter* writer) {
json(writer, slaveInfo);
});
}
});
}
void SlavesWriter::writeSlave(
const Slave* slave, JSON::ObjectWriter* writer) const
{
SlaveWriter(*slave, approvers_)(writer);
// Add the complete protobuf->JSON for all used, reserved,
// and offered resources. The other endpoints summarize
// resource information, which omits the details of
// reservations and persistent volumes. Full resource
// information is necessary so that operators can use the
// `/unreserve` and `/destroy-volumes` endpoints.
hashmap<string, Resources> reserved = slave->totalResources.reservations();
writer->field(
"reserved_resources_full",
[&reserved, this](JSON::ObjectWriter* writer) {
foreachpair (const string& role,
const Resources& resources,
reserved) {
if (approvers_->approved<VIEW_ROLE>(role)) {
writer->field(role, [&resources, this](
JSON::ArrayWriter* writer) {
foreach (Resource resource, resources) {
if (approvers_->approved<VIEW_ROLE>(resource)) {
convertResourceFormat(&resource, ENDPOINT);
writer->element(JSON::Protobuf(resource));
}
}
});
}
}
});
Resources unreservedResources = slave->totalResources.unreserved();
writer->field(
"unreserved_resources_full",
[&unreservedResources, this](JSON::ArrayWriter* writer) {
foreach (Resource resource, unreservedResources) {
if (approvers_->approved<VIEW_ROLE>(resource)) {
convertResourceFormat(&resource, ENDPOINT);
writer->element(JSON::Protobuf(resource));
}
}
});
Resources usedResources = Resources::sum(slave->usedResources);
writer->field(
"used_resources_full",
[&usedResources, this](JSON::ArrayWriter* writer) {
foreach (Resource resource, usedResources) {
if (approvers_->approved<VIEW_ROLE>(resource)) {
convertResourceFormat(&resource, ENDPOINT);
writer->element(JSON::Protobuf(resource));
}
}
});
const Resources& offeredResources = slave->offeredResources;
writer->field(
"offered_resources_full",
[&offeredResources, this](JSON::ArrayWriter* writer) {
foreach (Resource resource, offeredResources) {
if (approvers_->approved<VIEW_ROLE>(resource)) {
convertResourceFormat(&resource, ENDPOINT);
writer->element(JSON::Protobuf(resource));
}
}
});
}
void json(JSON::ObjectWriter* writer, const Summary<Framework>& summary)
{
const Framework& framework = summary;
writer->field("id", framework.id().value());
writer->field("name", framework.info.name());
// Omit pid for http frameworks.
if (framework.pid.isSome()) {
writer->field("pid", string(framework.pid.get()));
}
// TODO(bmahler): Use these in the webui.
writer->field("used_resources", framework.totalUsedResources);
writer->field("offered_resources", framework.totalOfferedResources);
writer->field("capabilities", framework.info.capabilities());
writer->field("hostname", framework.info.hostname());
writer->field("webui_url", framework.info.webui_url());
writer->field("active", framework.active());
writer->field("connected", framework.connected());
writer->field("recovered", framework.recovered());
}
// This abstraction has no side-effects. It factors out computing the
// mapping from 'slaves' to 'frameworks' to answer the questions 'what
// frameworks are running on a given slave?' and 'what slaves are
// running the given framework?'.
class SlaveFrameworkMapping
{
public:
SlaveFrameworkMapping(const hashmap<FrameworkID, Framework*>& frameworks)
{
foreachpair (const FrameworkID& frameworkId,
const Framework* framework,
frameworks) {
foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
frameworksToSlaves[frameworkId].insert(taskInfo.slave_id());
slavesToFrameworks[taskInfo.slave_id()].insert(frameworkId);
}
foreachvalue (const Task* task, framework->tasks) {
frameworksToSlaves[frameworkId].insert(task->slave_id());
slavesToFrameworks[task->slave_id()].insert(frameworkId);
}
foreachvalue (const Owned<Task>& task, framework->unreachableTasks) {
frameworksToSlaves[frameworkId].insert(task->slave_id());
slavesToFrameworks[task->slave_id()].insert(frameworkId);
}
foreach (const Owned<Task>& task, framework->completedTasks) {
frameworksToSlaves[frameworkId].insert(task->slave_id());
slavesToFrameworks[task->slave_id()].insert(frameworkId);
}
}
}
const hashset<FrameworkID>& frameworks(const SlaveID& slaveId) const
{
const auto iterator = slavesToFrameworks.find(slaveId);
return iterator != slavesToFrameworks.end() ?
iterator->second : hashset<FrameworkID>::EMPTY;
}
const hashset<SlaveID>& slaves(const FrameworkID& frameworkId) const
{
const auto iterator = frameworksToSlaves.find(frameworkId);
return iterator != frameworksToSlaves.end() ?
iterator->second : hashset<SlaveID>::EMPTY;
}
private:
hashmap<SlaveID, hashset<FrameworkID>> slavesToFrameworks;
hashmap<FrameworkID, hashset<SlaveID>> frameworksToSlaves;
};
// This abstraction has no side-effects. It factors out the accounting
// for a 'TaskState' summary. We use this to summarize 'TaskState's
// for both frameworks as well as slaves.
struct TaskStateSummary
{
// TODO(jmlvanre): Possibly clean this up as per MESOS-2694.
const static TaskStateSummary EMPTY;
TaskStateSummary()
: staging(0),
starting(0),
running(0),
killing(0),
finished(0),
killed(0),
failed(0),
lost(0),
error(0),
dropped(0),
unreachable(0),
gone(0),
gone_by_operator(0),
unknown(0) {}
// Account for the state of the given task.
void count(const Task& task)
{
switch (task.state()) {
case TASK_STAGING: { ++staging; break; }
case TASK_STARTING: { ++starting; break; }
case TASK_RUNNING: { ++running; break; }
case TASK_KILLING: { ++killing; break; }
case TASK_FINISHED: { ++finished; break; }
case TASK_KILLED: { ++killed; break; }
case TASK_FAILED: { ++failed; break; }
case TASK_LOST: { ++lost; break; }
case TASK_ERROR: { ++error; break; }
case TASK_DROPPED: { ++dropped; break; }
case TASK_UNREACHABLE: { ++unreachable; break; }
case TASK_GONE: { ++gone; break; }
case TASK_GONE_BY_OPERATOR: { ++gone_by_operator; break; }
case TASK_UNKNOWN: { ++unknown; break; }
// No default case allows for a helpful compiler error if we
// introduce a new state.
}
}
size_t staging;
size_t starting;
size_t running;
size_t killing;
size_t finished;
size_t killed;
size_t failed;
size_t lost;
size_t error;
size_t dropped;
size_t unreachable;
size_t gone;
size_t gone_by_operator;
size_t unknown;
};
const TaskStateSummary TaskStateSummary::EMPTY;
// This abstraction has no side-effects. It factors out computing the
// 'TaskState' summaries for frameworks and slaves. This answers the
// questions 'How many tasks are in each state for a given framework?'
// and 'How many tasks are in each state for a given slave?'.
class TaskStateSummaries
{
public:
TaskStateSummaries(const hashmap<FrameworkID, Framework*>& frameworks)
{
foreachpair (const FrameworkID& frameworkId,
const Framework* framework,
frameworks) {
foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
frameworkTaskSummaries[frameworkId].staging++;
slaveTaskSummaries[taskInfo.slave_id()].staging++;
}
foreachvalue (const Task* task, framework->tasks) {
frameworkTaskSummaries[frameworkId].count(*task);
slaveTaskSummaries[task->slave_id()].count(*task);
}
foreachvalue (const Owned<Task>& task, framework->unreachableTasks) {
frameworkTaskSummaries[frameworkId].count(*task);
slaveTaskSummaries[task->slave_id()].count(*task);
}
foreach (const Owned<Task>& task, framework->completedTasks) {
frameworkTaskSummaries[frameworkId].count(*task);
slaveTaskSummaries[task->slave_id()].count(*task);
}
}
}
const TaskStateSummary& framework(const FrameworkID& frameworkId) const
{
const auto iterator = frameworkTaskSummaries.find(frameworkId);
return iterator != frameworkTaskSummaries.end() ?
iterator->second : TaskStateSummary::EMPTY;
}
const TaskStateSummary& slave(const SlaveID& slaveId) const
{
const auto iterator = slaveTaskSummaries.find(slaveId);
return iterator != slaveTaskSummaries.end() ?
iterator->second : TaskStateSummary::EMPTY;
}
private:
hashmap<FrameworkID, TaskStateSummary> frameworkTaskSummaries;
hashmap<SlaveID, TaskStateSummary> slaveTaskSummaries;
};
process::http::Response Master::ReadOnlyHandler::frameworks(
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
IDAcceptor<FrameworkID> selectFrameworkId(
query.get("framework_id"));
// This lambda is consumed before the outer lambda
// returns, hence capture by reference is fine here.
const Master* master = this->master;
auto frameworks = [master, &approvers, &selectFrameworkId](
JSON::ObjectWriter* writer) {
// Model all of the frameworks.
writer->field(
"frameworks",
[master, &approvers, &selectFrameworkId](
JSON::ArrayWriter* writer) {
foreachvalue (
Framework* framework, master->frameworks.registered) {
// Skip unauthorized frameworks or frameworks
// without a matching ID.
if (!selectFrameworkId.accept(framework->id()) ||
!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
continue;
}
writer->element(FullFrameworkWriter(approvers, framework));
}
});
// Model all of the completed frameworks.
writer->field(
"completed_frameworks",
[master, &approvers, &selectFrameworkId](
JSON::ArrayWriter* writer) {
foreachvalue (const Owned<Framework>& framework,
master->frameworks.completed) {
// Skip unauthorized frameworks or frameworks
// without a matching ID.
if (!selectFrameworkId.accept(framework->id()) ||
!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
continue;
}
writer->element(
FullFrameworkWriter(approvers, framework.get()));
}
});
// Unregistered frameworks are no longer possible. We emit an
// empty array for the sake of backward compatibility.
writer->field("unregistered_frameworks", [](JSON::ArrayWriter*) {});
};
return OK(jsonify(frameworks), query.get("jsonp"));
}
// Returns a JSON object modeled after a role.
JSON::Object model(
const string& name,
Option<double> weight,
Option<Quota> quota,
Option<Role*> _role)
{
JSON::Object object;
object.values["name"] = name;
if (weight.isSome()) {
object.values["weight"] = weight.get();
} else {
object.values["weight"] = 1.0; // Default weight.
}
if (quota.isSome()) {
object.values["quota"] = model(quota->info);
}
if (_role.isNone()) {
object.values["resources"] = model(Resources());
object.values["frameworks"] = JSON::Array();
} else {
Role* role = _role.get();
object.values["resources"] = model(role->allocatedResources());
{
JSON::Array array;
foreachkey (const FrameworkID& frameworkId, role->frameworks) {
array.values.push_back(frameworkId.value());
}
object.values["frameworks"] = std::move(array);
}
}
return object;
}
process::http::Response Master::ReadOnlyHandler::roles(
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
JSON::Object object;
const vector<string> filteredRoles = master->filterRoles(approvers);
{
JSON::Array array;
foreach (const string& name, filteredRoles) {
Option<double> weight = None();
if (master->weights.contains(name)) {
weight = master->weights.at(name);
}
Option<Quota> quota = None();
if (master->quotas.contains(name)) {
quota = master->quotas.at(name);
}
Option<Role*> role = None();
if (master->roles.contains(name)) {
role = master->roles.at(name);
}
array.values.push_back(model(name, weight, quota, role));
}
object.values["roles"] = std::move(array);
}
return OK(object, query.get("jsonp"));
}
process::http::Response Master::ReadOnlyHandler::slaves(
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
IDAcceptor<SlaveID> selectSlaveId(query.get("slave_id"));
return process::http::OK(
jsonify(SlavesWriter(master->slaves, approvers, selectSlaveId)),
query.get("jsonp"));
}
process::http::Response Master::ReadOnlyHandler::state(
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
const Master* master = this->master;
auto calculateState = [master, &approvers](JSON::ObjectWriter* writer) {
writer->field("version", MESOS_VERSION);
if (build::GIT_SHA.isSome()) {
writer->field("git_sha", build::GIT_SHA.get());
}
if (build::GIT_BRANCH.isSome()) {
writer->field("git_branch", build::GIT_BRANCH.get());
}
if (build::GIT_TAG.isSome()) {
writer->field("git_tag", build::GIT_TAG.get());
}
writer->field("build_date", build::DATE);
writer->field("build_time", build::TIME);
writer->field("build_user", build::USER);
writer->field("start_time", master->startTime.secs());
if (master->electedTime.isSome()) {
writer->field("elected_time", master->electedTime->secs());
}
writer->field("id", master->info().id());
writer->field("pid", string(master->self()));
writer->field("hostname", master->info().hostname());
writer->field("capabilities", master->info().capabilities());
writer->field("activated_slaves", master->_const_slaves_active());
writer->field("deactivated_slaves", master->_const_slaves_inactive());
writer->field("unreachable_slaves", master->_const_slaves_unreachable());
if (master->info().has_domain()) {
writer->field("domain", master->info().domain());
}
// TODO(haosdent): Deprecated this in favor of `leader_info` below.
if (master->leader.isSome()) {
writer->field("leader", master->leader->pid());
}
if (master->leader.isSome()) {
writer->field("leader_info", [master](JSON::ObjectWriter* writer) {
json(writer, master->leader.get());
});
}
if (approvers->approved<VIEW_FLAGS>()) {
if (master->flags.cluster.isSome()) {
writer->field("cluster", master->flags.cluster.get());
}
if (master->flags.log_dir.isSome()) {
writer->field("log_dir", master->flags.log_dir.get());
}
if (master->flags.external_log_file.isSome()) {
writer->field("external_log_file",
master->flags.external_log_file.get());
}
writer->field("flags", [master](JSON::ObjectWriter* writer) {
foreachvalue (const flags::Flag& flag, master->flags) {
Option<string> value = flag.stringify(master->flags);
if (value.isSome()) {
writer->field(flag.effective_name().value, value.get());
}
}
});
}
// Model all of the registered slaves.
writer->field(
"slaves",
[master, &approvers](JSON::ArrayWriter* writer) {
foreachvalue (Slave* slave, master->slaves.registered) {
writer->element(SlaveWriter(*slave, approvers));
}
});
// Model all of the recovered slaves.
writer->field(
"recovered_slaves",
[master](JSON::ArrayWriter* writer) {
foreachvalue (
const SlaveInfo& slaveInfo, master->slaves.recovered) {
writer->element([&slaveInfo](JSON::ObjectWriter* writer) {
json(writer, slaveInfo);
});
}
});
// Model all of the frameworks.
writer->field(
"frameworks",
[master, &approvers](JSON::ArrayWriter* writer) {
foreachvalue (
Framework* framework, master->frameworks.registered) {
// Skip unauthorized frameworks.
if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
continue;
}
writer->element(FullFrameworkWriter(approvers, framework));
}
});
// Model all of the completed frameworks.
writer->field(
"completed_frameworks",
[master, &approvers](JSON::ArrayWriter* writer) {
foreachvalue (
const Owned<Framework>& framework,
master->frameworks.completed) {
// Skip unauthorized frameworks.
if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
continue;
}
writer->element(
FullFrameworkWriter(approvers, framework.get()));
}
});
// Orphan tasks are no longer possible. We emit an empty array
// for the sake of backward compatibility.
writer->field("orphan_tasks", [](JSON::ArrayWriter*) {});
// Unregistered frameworks are no longer possible. We emit an
// empty array for the sake of backward compatibility.
writer->field("unregistered_frameworks", [](JSON::ArrayWriter*) {});
};
return OK(jsonify(calculateState), query.get("jsonp"));
}
process::http::Response Master::ReadOnlyHandler::stateSummary(
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
const Master* master = this->master;
auto stateSummary = [master, &approvers](JSON::ObjectWriter* writer) {
writer->field("hostname", master->info().hostname());
if (master->flags.cluster.isSome()) {
writer->field("cluster", master->flags.cluster.get());
}
// We use the tasks in the 'Frameworks' struct to compute summaries
// for this endpoint. This is done 1) for consistency between the
// 'slaves' and 'frameworks' subsections below 2) because we want to
// provide summary information for frameworks that are currently
// registered 3) the frameworks keep a circular buffer of completed
// tasks that we can use to keep a limited view on the history of
// recent completed / failed tasks.
// Generate mappings from 'slave' to 'framework' and reverse.
SlaveFrameworkMapping slaveFrameworkMapping(
master->frameworks.registered);
// Generate 'TaskState' summaries for all framework and slave ids.
TaskStateSummaries taskStateSummaries(
master->frameworks.registered);
// Model all of the slaves.
writer->field(
"slaves",
[master,
&slaveFrameworkMapping,
&taskStateSummaries,
&approvers](JSON::ArrayWriter* writer) {
foreachvalue (Slave* slave, master->slaves.registered) {
writer->element(
[&slave,
&slaveFrameworkMapping,
&taskStateSummaries,
&approvers](JSON::ObjectWriter* writer) {
SlaveWriter slaveWriter(*slave, approvers);
slaveWriter(writer);
// Add the 'TaskState' summary for this slave.
const TaskStateSummary& summary =
taskStateSummaries.slave(slave->id);
// Certain per-agent status totals will always be zero
// (e.g., TASK_ERROR, TASK_UNREACHABLE). We report
// them here anyway, for completeness.
//
// TODO(neilc): Update for TASK_GONE and
// TASK_GONE_BY_OPERATOR.
writer->field("TASK_STAGING", summary.staging);
writer->field("TASK_STARTING", summary.starting);
writer->field("TASK_RUNNING", summary.running);
writer->field("TASK_KILLING", summary.killing);
writer->field("TASK_FINISHED", summary.finished);
writer->field("TASK_KILLED", summary.killed);
writer->field("TASK_FAILED", summary.failed);
writer->field("TASK_LOST", summary.lost);
writer->field("TASK_ERROR", summary.error);
writer->field(
"TASK_UNREACHABLE",
summary.unreachable);
// Add the ids of all the frameworks running on this
// slave.
const hashset<FrameworkID>& frameworks =
slaveFrameworkMapping.frameworks(slave->id);
writer->field(
"framework_ids",
[&frameworks](JSON::ArrayWriter* writer) {
foreach (
const FrameworkID& frameworkId,
frameworks) {
writer->element(frameworkId.value());
}
});
});
}
});
// Model all of the frameworks.
writer->field(
"frameworks",
[master,
&slaveFrameworkMapping,
&taskStateSummaries,
&approvers](JSON::ArrayWriter* writer) {
foreachpair (const FrameworkID& frameworkId,
Framework* framework,
master->frameworks.registered) {
// Skip unauthorized frameworks.
if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
continue;
}
writer->element(
[&frameworkId,
&framework,
&slaveFrameworkMapping,
&taskStateSummaries](JSON::ObjectWriter* writer) {
json(writer, Summary<Framework>(*framework));
// Add the 'TaskState' summary for this framework.
const TaskStateSummary& summary =
taskStateSummaries.framework(frameworkId);
// TODO(neilc): Update for TASK_GONE and
// TASK_GONE_BY_OPERATOR.
writer->field("TASK_STAGING", summary.staging);
writer->field("TASK_STARTING", summary.starting);
writer->field("TASK_RUNNING", summary.running);
writer->field("TASK_KILLING", summary.killing);
writer->field("TASK_FINISHED", summary.finished);
writer->field("TASK_KILLED", summary.killed);
writer->field("TASK_FAILED", summary.failed);
writer->field("TASK_LOST", summary.lost);
writer->field("TASK_ERROR", summary.error);
writer->field(
"TASK_UNREACHABLE",
summary.unreachable);
// Add the ids of all the slaves running
// this framework.
const hashset<SlaveID>& slaves =
slaveFrameworkMapping.slaves(frameworkId);
writer->field(
"slave_ids",
[&slaves](JSON::ArrayWriter* writer) {
foreach (const SlaveID& slaveId, slaves) {
writer->element(slaveId.value());
}
});
});
}
});
};
return OK(jsonify(stateSummary), query.get("jsonp"));
}
struct TaskComparator
{
static bool ascending(const Task* lhs, const Task* rhs)
{
size_t lhsSize = lhs->statuses().size();
size_t rhsSize = rhs->statuses().size();
if ((lhsSize == 0) && (rhsSize == 0)) {
return false;
}
if (lhsSize == 0) {
return true;
}
if (rhsSize == 0) {
return false;
}
return (lhs->statuses(0).timestamp() < rhs->statuses(0).timestamp());
}
static bool descending(const Task* lhs, const Task* rhs)
{
size_t lhsSize = lhs->statuses().size();
size_t rhsSize = rhs->statuses().size();
if ((lhsSize == 0) && (rhsSize == 0)) {
return false;
}
if (rhsSize == 0) {
return true;
}
if (lhsSize == 0) {
return false;
}
return (lhs->statuses(0).timestamp() > rhs->statuses(0).timestamp());
}
};
process::http::Response Master::ReadOnlyHandler::tasks(
const hashmap<std::string, std::string>& query,
const process::Owned<ObjectApprovers>& approvers) const
{
// Get list options (limit and offset).
Result<int> result = numify<int>(query.get("limit"));
size_t limit = result.isSome() ? result.get() : TASK_LIMIT;
result = numify<int>(query.get("offset"));
size_t offset = result.isSome() ? result.get() : 0;
Option<string> order = query.get("order");
string _order = order.isSome() && (order.get() == "asc") ? "asc" : "des";
Option<string> frameworkId = query.get("framework_id");
Option<string> taskId = query.get("task_id");
IDAcceptor<FrameworkID> selectFrameworkId(frameworkId);
IDAcceptor<TaskID> selectTaskId(taskId);
// Construct framework list with both active and completed frameworks.
vector<const Framework*> frameworks;
foreachvalue (Framework* framework, master->frameworks.registered) {
// Skip unauthorized frameworks or frameworks without matching
// framework ID.
if (!selectFrameworkId.accept(framework->id()) ||
!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
continue;
}
frameworks.push_back(framework);
}
foreachvalue (const Owned<Framework>& framework,
master->frameworks.completed) {
// Skip unauthorized frameworks or frameworks without matching
// framework ID.
if (!selectFrameworkId.accept(framework->id()) ||
!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
continue;
}
frameworks.push_back(framework.get());
}
// Construct task list with both running,
// completed and unreachable tasks.
vector<const Task*> tasks;
foreach (const Framework* framework, frameworks) {
foreachvalue (Task* task, framework->tasks) {
CHECK_NOTNULL(task);
// Skip unauthorized tasks or tasks without matching task ID.
if (!selectTaskId.accept(task->task_id()) ||
!approvers->approved<VIEW_TASK>(*task, framework->info)) {
continue;
}
tasks.push_back(task);
}
foreachvalue (
const Owned<Task>& task,
framework->unreachableTasks) {
// Skip unauthorized tasks or tasks without matching task ID.
if (!selectTaskId.accept(task->task_id()) ||
!approvers->approved<VIEW_TASK>(*task, framework->info)) {
continue;
}
tasks.push_back(task.get());
}
foreach (const Owned<Task>& task, framework->completedTasks) {
// Skip unauthorized tasks or tasks without matching task ID.
if (!selectTaskId.accept(task->task_id()) ||
!approvers->approved<VIEW_TASK>(*task, framework->info)) {
continue;
}
tasks.push_back(task.get());
}
}
// Sort tasks by task status timestamp. Default order is descending.
// The earliest timestamp is chosen for comparison when
// multiple are present.
if (_order == "asc") {
sort(tasks.begin(), tasks.end(), TaskComparator::ascending);
} else {
sort(tasks.begin(), tasks.end(), TaskComparator::descending);
}
auto tasksWriter =
[&tasks, limit, offset](JSON::ObjectWriter* writer) {
writer->field(
"tasks",
[&tasks, limit, offset](JSON::ArrayWriter* writer) {
// Collect 'limit' number of tasks starting from 'offset'.
size_t end = std::min(offset + limit, tasks.size());
for (size_t i = offset; i < end; i++) {
writer->element(*tasks[i]);
}
});
};
return OK(jsonify(tasksWriter), query.get("jsonp"));
}
} // namespace master {
} // namespace internal {
} // namespace mesos {