// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>
#include <iomanip>
#include <map>
#include <memory>
#include <set>
#include <sstream>
#include <string>
#include <tuple>
#include <utility>
#include <vector>

#include <mesos/attributes.hpp>
#include <mesos/type_utils.hpp>

#include <mesos/authorizer/authorizer.hpp>

#include <mesos/maintenance/maintenance.hpp>

#include <mesos/master/master.hpp>

#include <mesos/v1/master/master.hpp>

#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/help.hpp>
#include <process/logging.hpp>

#include <process/metrics/metrics.hpp>

#include <stout/base64.hpp>
#include <stout/errorbase.hpp>
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/json.hpp>
#include <stout/jsonify.hpp>
#include <stout/lambda.hpp>
#include <stout/net.hpp>
#include <stout/none.hpp>
#include <stout/nothing.hpp>
#include <stout/numify.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/protobuf.hpp>
#include <stout/representation.hpp>
#include <stout/result.hpp>
#include <stout/strings.hpp>
#include <stout/try.hpp>
#include <stout/unreachable.hpp>
#include <stout/utils.hpp>
#include <stout/uuid.hpp>

#include "common/build.hpp"
#include "common/http.hpp"
#include "common/protobuf_utils.hpp"
#include "common/resources_utils.hpp"

#include "internal/devolve.hpp"

#include "logging/logging.hpp"

#include "master/machine.hpp"
#include "master/maintenance.hpp"
#include "master/master.hpp"
#include "master/validation.hpp"

#include "mesos/mesos.hpp"
#include "mesos/resources.hpp"

#include "version/version.hpp"

using google::protobuf::RepeatedPtrField;

using process::AUTHENTICATION;
using process::AUTHORIZATION;
using process::Clock;
using process::DESCRIPTION;
using process::Failure;
using process::Future;
using process::HELP;
using process::Logging;
using process::TLDR;

using process::http::Accepted;
using process::http::BadRequest;
using process::http::Conflict;
using process::http::Forbidden;
using process::http::InternalServerError;
using process::http::MethodNotAllowed;
using process::http::NotFound;
using process::http::NotImplemented;
using process::http::NotAcceptable;
using process::http::OK;
using process::http::Pipe;
using process::http::ServiceUnavailable;
using process::http::TemporaryRedirect;
using process::http::UnsupportedMediaType;
using process::http::URL;

using process::http::authentication::Principal;

using std::copy_if;
using std::list;
using std::map;
using std::set;
using std::string;
using std::tie;
using std::tuple;
using std::vector;


namespace mesos {

using mesos::authorization::createSubject;

static void json(
    JSON::StringWriter* writer,
    const FrameworkInfo::Capability& capability)
{
  writer->append(FrameworkInfo::Capability::Type_Name(capability.type()));
}


static void json(
    JSON::StringWriter* writer,
    const SlaveInfo::Capability& capability)
{
  writer->append(SlaveInfo::Capability::Type_Name(capability.type()));
}


static void json(JSON::ObjectWriter* writer, const Offer& offer)
{
  writer->field("id", offer.id().value());
  writer->field("framework_id", offer.framework_id().value());
  writer->field("allocation_info", JSON::Protobuf(offer.allocation_info()));
  writer->field("slave_id", offer.slave_id().value());
  writer->field("resources", Resources(offer.resources()));
}


static void json(JSON::ObjectWriter* writer, const MasterInfo& info)
{
  writer->field("id", info.id());
  writer->field("pid", info.pid());
  writer->field("port", info.port());
  writer->field("hostname", info.hostname());

  if (info.has_domain()) {
    writer->field("domain", info.domain());
  }
}


static void json(JSON::ObjectWriter* writer, const SlaveInfo& slaveInfo)
{
  writer->field("id", slaveInfo.id().value());
  writer->field("hostname", slaveInfo.hostname());
  writer->field("port", slaveInfo.port());
  writer->field("attributes", Attributes(slaveInfo.attributes()));

  if (slaveInfo.has_domain()) {
    writer->field("domain", slaveInfo.domain());
  }
}

namespace internal {
namespace master {

// Pull in model overrides from common.
using mesos::internal::model;

// Pull in definitions from process.
using process::http::Response;
using process::http::Request;
using process::Owned;


// The summary representation of `T` to support the `/state-summary` endpoint.
// e.g., `Summary<Slave>`.
template <typename T>
struct Summary : Representation<T>
{
  using Representation<T>::Representation;
};


// The full representation of `T` to support the `/state` endpoint.
// e.g., `Full<Slave>`.
template <typename T>
struct Full : Representation<T>
{
  using Representation<T>::Representation;
};


// Forward declaration for `FullFrameworkWriter`.
static void json(JSON::ObjectWriter* writer, const Summary<Framework>& summary);


// Filtered representation of Full<Framework>.
// Executors and Tasks are filtered based on whether the
// user is authorized to view them.
struct FullFrameworkWriter {
  FullFrameworkWriter(
      const Owned<AuthorizationAcceptor>& authorizeTask,
      const Owned<AuthorizationAcceptor>& authorizeExecutorInfo,
      const Framework* framework)
    : authorizeTask_(authorizeTask),
      authorizeExecutorInfo_(authorizeExecutorInfo),
      framework_(framework) {}

  void operator()(JSON::ObjectWriter* writer) const
  {
    json(writer, Summary<Framework>(*framework_));

    // Add additional fields to those generated by the
    // `Summary<Framework>` overload.
    writer->field("user", framework_->info.user());
    writer->field("failover_timeout", framework_->info.failover_timeout());
    writer->field("checkpoint", framework_->info.checkpoint());
    writer->field("registered_time", framework_->registeredTime.secs());
    writer->field("unregistered_time", framework_->unregisteredTime.secs());

    if (framework_->info.has_principal()) {
      writer->field("principal", framework_->info.principal());
    }

    // TODO(bmahler): Consider deprecating this in favor of the split
    // used and offered resources added in `Summary<Framework>`.
    writer->field(
        "resources",
        framework_->totalUsedResources + framework_->totalOfferedResources);

    // TODO(benh): Consider making reregisteredTime an Option.
    if (framework_->registeredTime != framework_->reregisteredTime) {
      writer->field("reregistered_time", framework_->reregisteredTime.secs());
    }

    // For multi-role frameworks the `role` field will be unset.
    // Note that we could set `roles` here for both cases, which
    // would make tooling simpler (only need to look for `roles`).
    // However, we opted to just mirror the protobuf akin to how
    // generic protobuf -> JSON translation works.
    if (framework_->capabilities.multiRole) {
      writer->field("roles", framework_->info.roles());
    } else {
      writer->field("role", framework_->info.role());
    }

    // Model all of the tasks associated with a framework.
    writer->field("tasks", [this](JSON::ArrayWriter* writer) {
      foreachvalue (const TaskInfo& taskInfo, framework_->pendingTasks) {
        // Skip unauthorized tasks.
        if (!authorizeTask_->accept(taskInfo, framework_->info)) {
          continue;
        }

        writer->element([this, &taskInfo](JSON::ObjectWriter* writer) {
          writer->field("id", taskInfo.task_id().value());
          writer->field("name", taskInfo.name());
          writer->field("framework_id", framework_->id().value());

          writer->field(
              "executor_id",
              taskInfo.executor().executor_id().value());

          writer->field("slave_id", taskInfo.slave_id().value());
          writer->field("state", TaskState_Name(TASK_STAGING));
          writer->field("resources", Resources(taskInfo.resources()));

          // Tasks are not allowed to mix resources allocated to
          // different roles, see MESOS-6636.
          writer->field(
              "role",
              taskInfo.resources().begin()->allocation_info().role());

          writer->field("statuses", std::initializer_list<TaskStatus>{});

          if (taskInfo.has_labels()) {
            writer->field("labels", taskInfo.labels());
          }

          if (taskInfo.has_discovery()) {
            writer->field("discovery", JSON::Protobuf(taskInfo.discovery()));
          }

          if (taskInfo.has_container()) {
            writer->field("container", JSON::Protobuf(taskInfo.container()));
          }
        });
      }

      foreachvalue (Task* task, framework_->tasks) {
        // Skip unauthorized tasks.
        if (!authorizeTask_->accept(*task, framework_->info)) {
          continue;
        }

        writer->element(*task);
      }
    });

    writer->field("unreachable_tasks", [this](JSON::ArrayWriter* writer) {
      foreachvalue (const Owned<Task>& task, framework_->unreachableTasks) {
        // Skip unauthorized tasks.
        if (!authorizeTask_->accept(*task.get(), framework_->info)) {
          continue;
        }

        writer->element(*task.get());
      }
    });

    writer->field("completed_tasks", [this](JSON::ArrayWriter* writer) {
      foreach (const Owned<Task>& task, framework_->completedTasks) {
        // Skip unauthorized tasks.
        if (!authorizeTask_->accept(*task.get(), framework_->info)) {
          continue;
        }

        writer->element(*task.get());
      }
    });

    // Model all of the offers associated with a framework.
    writer->field("offers", [this](JSON::ArrayWriter* writer) {
      foreach (Offer* offer, framework_->offers) {
        writer->element(*offer);
      }
    });

    // Model all of the executors of a framework.
    writer->field("executors", [this](JSON::ArrayWriter* writer) {
      foreachpair (
          const SlaveID& slaveId,
          const auto& executorsMap,
          framework_->executors) {
        foreachvalue (const ExecutorInfo& executor, executorsMap) {
          writer->element([this,
                           &executor,
                           &slaveId](JSON::ObjectWriter* writer) {
            // Skip unauthorized executors.
            if (!authorizeExecutorInfo_->accept(executor, framework_->info)) {
              return;
            }

            json(writer, executor);
            writer->field("slave_id", slaveId.value());
          });
        }
      }
    });

    // Model all of the labels associated with a framework.
    if (framework_->info.has_labels()) {
      writer->field("labels", framework_->info.labels());
    }
  }

  const Owned<AuthorizationAcceptor>& authorizeTask_;
  const Owned<AuthorizationAcceptor>& authorizeExecutorInfo_;
  const Framework* framework_;
};


struct SlaveWriter
{
  SlaveWriter(
      const Slave& slave,
      const Owned<AuthorizationAcceptor>& authorizeRole)
    : slave_(slave), authorizeRole_(authorizeRole) {}

  void operator()(JSON::ObjectWriter* writer) const
  {
    json(writer, slave_.info);

    writer->field("pid", string(slave_.pid));
    writer->field("registered_time", slave_.registeredTime.secs());

    if (slave_.reregisteredTime.isSome()) {
      writer->field("reregistered_time", slave_.reregisteredTime.get().secs());
    }

    const Resources& totalResources = slave_.totalResources;
    writer->field("resources", totalResources);
    writer->field("used_resources", Resources::sum(slave_.usedResources));
    writer->field("offered_resources", slave_.offeredResources);
    writer->field(
        "reserved_resources",
        [&totalResources, this](JSON::ObjectWriter* writer) {
          foreachpair (const string& role, const Resources& reservation,
                       totalResources.reservations()) {
            // TODO(arojas): Consider showing unapproved resources in an
            // aggregated special field, so that all resource values add up
            // MESOS-7779.
            if (authorizeRole_->accept(role)) {
              writer->field(role, reservation);
            }
          }
        });
    writer->field("unreserved_resources", totalResources.unreserved());

    writer->field("active", slave_.active);
    writer->field("version", slave_.version);
    writer->field("capabilities", slave_.capabilities.toRepeatedPtrField());
  }

  const Slave& slave_;
  const Owned<AuthorizationAcceptor>& authorizeRole_;
};


struct SlavesWriter
{
  SlavesWriter(
      const Master::Slaves& slaves,
      const Owned<AuthorizationAcceptor>& authorizeRole,
      const IDAcceptor<SlaveID>& selectSlaveId)
    : slaves_(slaves),
      authorizeRole_(authorizeRole),
      selectSlaveId_(selectSlaveId) {}

  void operator()(JSON::ObjectWriter* writer) const
  {
    writer->field("slaves", [this](JSON::ArrayWriter* writer) {
      foreachvalue (const Slave* slave, slaves_.registered) {
        if (!selectSlaveId_.accept(slave->id)) {
          continue;
        }

        writer->element([this, &slave](JSON::ObjectWriter* writer) {
          writeSlave(slave, writer);
        });
      }
    });

    writer->field("recovered_slaves", [this](JSON::ArrayWriter* writer) {
      foreachvalue (const SlaveInfo& slaveInfo, slaves_.recovered) {
        if (!selectSlaveId_.accept(slaveInfo.id())) {
          continue;
        }

        writer->element([&slaveInfo](JSON::ObjectWriter* writer) {
          json(writer, slaveInfo);
        });
      }
    });
  }

  void writeSlave(const Slave* slave, JSON::ObjectWriter* writer) const
  {
    SlaveWriter(*slave, authorizeRole_)(writer);

    // Add the complete protobuf->JSON for all used, reserved,
    // and offered resources. The other endpoints summarize
    // resource information, which omits the details of
    // reservations and persistent volumes. Full resource
    // information is necessary so that operators can use the
    // `/unreserve` and `/destroy-volumes` endpoints.

    hashmap<string, Resources> reserved = slave->totalResources.reservations();

    writer->field(
        "reserved_resources_full",
        [&reserved, this](JSON::ObjectWriter* writer) {
          foreachpair (const string& role,
                       const Resources& resources,
                       reserved) {
            if (authorizeRole_->accept(role)) {
              writer->field(role, [&resources, this](
                  JSON::ArrayWriter* writer) {
                foreach (Resource resource, resources) {
                  if (authorizeResource(resource, authorizeRole_)) {
                    convertResourceFormat(&resource, ENDPOINT);
                    writer->element(JSON::Protobuf(resource));
                  }
                }
              });
            }
          }
        });

    Resources unreservedResources = slave->totalResources.unreserved();

    writer->field(
        "unreserved_resources_full",
        [&unreservedResources, this](JSON::ArrayWriter* writer) {
          foreach (Resource resource, unreservedResources) {
            if (authorizeResource(resource, authorizeRole_)) {
              convertResourceFormat(&resource, ENDPOINT);
              writer->element(JSON::Protobuf(resource));
            }
          }
        });

    Resources usedResources = Resources::sum(slave->usedResources);

    writer->field(
        "used_resources_full",
        [&usedResources, this](JSON::ArrayWriter* writer) {
          foreach (Resource resource, usedResources) {
            if (authorizeResource(resource, authorizeRole_)) {
              convertResourceFormat(&resource, ENDPOINT);
              writer->element(JSON::Protobuf(resource));
            }
          }
        });

    const Resources& offeredResources = slave->offeredResources;

    writer->field(
        "offered_resources_full",
        [&offeredResources, this](JSON::ArrayWriter* writer) {
          foreach (Resource resource, offeredResources) {
            if (authorizeResource(resource, authorizeRole_)) {
              convertResourceFormat(&resource, ENDPOINT);
              writer->element(JSON::Protobuf(resource));
            }
          }
        });
  }

  const Master::Slaves& slaves_;
  const Owned<AuthorizationAcceptor>& authorizeRole_;
  const IDAcceptor<SlaveID>& selectSlaveId_;
};


static void json(JSON::ObjectWriter* writer, const Summary<Framework>& summary)
{
  const Framework& framework = summary;

  writer->field("id", framework.id().value());
  writer->field("name", framework.info.name());

  // Omit pid for http frameworks.
  if (framework.pid.isSome()) {
    writer->field("pid", string(framework.pid.get()));
  }

  // TODO(bmahler): Use these in the webui.
  writer->field("used_resources", framework.totalUsedResources);
  writer->field("offered_resources", framework.totalOfferedResources);
  writer->field("capabilities", framework.info.capabilities());
  writer->field("hostname", framework.info.hostname());
  writer->field("webui_url", framework.info.webui_url());
  writer->field("active", framework.active());
  writer->field("connected", framework.connected());
  writer->field("recovered", framework.recovered());
}


string Master::Http::API_HELP()
{
  return HELP(
    TLDR(
        "Endpoint for API calls against the master."),
    DESCRIPTION(
        "Returns 200 OK when the request was processed successfully.",
        "",
        "Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
        "current master is not the leader.",
        "",
        "Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
        "found."),
    AUTHENTICATION(true),
    AUTHORIZATION(
        "The information returned by this endpoint for certain calls",
        "might be filtered based on the user accessing it.",
        "For example a user might only see the subset of frameworks,",
        "tasks, and executors they are allowed to view.",
        "See the authorization documentation for details."));
}


Future<Response> Master::Http::api(
    const Request& request,
    const Option<Principal>& principal) const
{
  // TODO(greggomann): Remove this check once the `Principal` type is used in
  // `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
  // See MESOS-7202.
  if (principal.isSome() && principal->value.isNone()) {
    return Forbidden(
        "The request's authenticated principal contains claims, but no value "
        "string. The master currently requires that principals have a value");
  }

  // TODO(vinod): Add metrics for rejected requests.

  // TODO(vinod): Add support for rate limiting.

  // When current master is not the leader, redirect to the leading master.
  // Note that this could happen when an operator, or some other
  // service, including a scheduler realizes this is the leading
  // master before the master itself realizes it, e.g., due to a
  // ZooKeeper watch delay.
  if (!master->elected()) {
    return redirect(request);
  }

  CHECK_SOME(master->recovered);

  if (!master->recovered.get().isReady()) {
    return ServiceUnavailable("Master has not finished recovery");
  }

  if (request.method != "POST") {
    return MethodNotAllowed({"POST"}, request.method);
  }

  v1::master::Call v1Call;

  // TODO(anand): Content type values are case-insensitive.
  Option<string> contentType = request.headers.get("Content-Type");

  if (contentType.isNone()) {
    return BadRequest("Expecting 'Content-Type' to be present");
  }

  if (contentType.get() == APPLICATION_PROTOBUF) {
    if (!v1Call.ParseFromString(request.body)) {
      return BadRequest("Failed to parse body into Call protobuf");
    }
  } else if (contentType.get() == APPLICATION_JSON) {
    Try<JSON::Value> value = JSON::parse(request.body);

    if (value.isError()) {
      return BadRequest("Failed to parse body into JSON: " + value.error());
    }

    Try<v1::master::Call> parse =
      ::protobuf::parse<v1::master::Call>(value.get());

    if (parse.isError()) {
      return BadRequest("Failed to convert JSON into Call protobuf: " +
                        parse.error());
    }

    v1Call = parse.get();
  } else {
    return UnsupportedMediaType(
        string("Expecting 'Content-Type' of ") +
        APPLICATION_JSON + " or " + APPLICATION_PROTOBUF);
  }

  mesos::master::Call call = devolve(v1Call);

  Option<Error> error = validation::master::call::validate(call, principal);

  if (error.isSome()) {
    return BadRequest("Failed to validate master::Call: " +
                      error.get().message);
  }

  LOG(INFO) << "Processing call " << call.type();

  ContentType acceptType;
  if (request.acceptsMediaType(APPLICATION_JSON)) {
    acceptType = ContentType::JSON;
  } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
    acceptType = ContentType::PROTOBUF;
  } else {
    return NotAcceptable(
        string("Expecting 'Accept' to allow ") +
        "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
  }

  switch (call.type()) {
    case mesos::master::Call::UNKNOWN:
      return NotImplemented();

    case mesos::master::Call::GET_HEALTH:
      return getHealth(call, principal, acceptType);

    case mesos::master::Call::GET_FLAGS:
      return getFlags(call, principal, acceptType);

    case mesos::master::Call::GET_VERSION:
      return getVersion(call, principal, acceptType);

    case mesos::master::Call::GET_METRICS:
      return getMetrics(call, principal, acceptType);

    case mesos::master::Call::GET_LOGGING_LEVEL:
      return getLoggingLevel(call, principal, acceptType);

    case mesos::master::Call::SET_LOGGING_LEVEL:
      return setLoggingLevel(call, principal, acceptType);

    case mesos::master::Call::LIST_FILES:
      return listFiles(call, principal, acceptType);

    case mesos::master::Call::READ_FILE:
      return readFile(call, principal, acceptType);

    case mesos::master::Call::GET_STATE:
      return getState(call, principal, acceptType);

    case mesos::master::Call::GET_AGENTS:
      return getAgents(call, principal, acceptType);

    case mesos::master::Call::GET_FRAMEWORKS:
      return getFrameworks(call, principal, acceptType);

    case mesos::master::Call::GET_EXECUTORS:
      return getExecutors(call, principal, acceptType);

    case mesos::master::Call::GET_TASKS:
      return getTasks(call, principal, acceptType);

    case mesos::master::Call::GET_ROLES:
      return getRoles(call, principal, acceptType);

    case mesos::master::Call::GET_WEIGHTS:
      return weightsHandler.get(call, principal, acceptType);

    case mesos::master::Call::UPDATE_WEIGHTS:
      return weightsHandler.update(call, principal, acceptType);

    case mesos::master::Call::GET_MASTER:
      return getMaster(call, principal, acceptType);

    case mesos::master::Call::SUBSCRIBE:
      return subscribe(call, principal, acceptType);

    case mesos::master::Call::RESERVE_RESOURCES:
      return reserveResources(call, principal, acceptType);

    case mesos::master::Call::UNRESERVE_RESOURCES:
      return unreserveResources(call, principal, acceptType);

    case mesos::master::Call::CREATE_VOLUMES:
      return createVolumes(call, principal, acceptType);

    case mesos::master::Call::DESTROY_VOLUMES:
      return destroyVolumes(call, principal, acceptType);

    case mesos::master::Call::GET_MAINTENANCE_STATUS:
      return getMaintenanceStatus(call, principal, acceptType);

    case mesos::master::Call::GET_MAINTENANCE_SCHEDULE:
      return getMaintenanceSchedule(call, principal, acceptType);

    case mesos::master::Call::UPDATE_MAINTENANCE_SCHEDULE:
      return updateMaintenanceSchedule(call, principal, acceptType);

    case mesos::master::Call::START_MAINTENANCE:
      return startMaintenance(call, principal, acceptType);

    case mesos::master::Call::STOP_MAINTENANCE:
      return stopMaintenance(call, principal, acceptType);

    case mesos::master::Call::GET_QUOTA:
      return quotaHandler.status(call, principal, acceptType);

    case mesos::master::Call::SET_QUOTA:
      return quotaHandler.set(call, principal);

    case mesos::master::Call::REMOVE_QUOTA:
      return quotaHandler.remove(call, principal);
  }

  UNREACHABLE();
}


Future<Response> Master::Http::subscribe(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType contentType) const
{
  CHECK_EQ(mesos::master::Call::SUBSCRIBE, call.type());

  // Retrieve Approvers for authorizing frameworks and tasks.
  Future<Owned<ObjectApprover>> frameworksApprover;
  Future<Owned<ObjectApprover>> tasksApprover;
  Future<Owned<ObjectApprover>> executorsApprover;
  if (master->authorizer.isSome()) {
    Option<authorization::Subject> subject = createSubject(principal);

    frameworksApprover = master->authorizer.get()->getObjectApprover(
        subject, authorization::VIEW_FRAMEWORK);

    tasksApprover = master->authorizer.get()->getObjectApprover(
        subject, authorization::VIEW_TASK);

    executorsApprover = master->authorizer.get()->getObjectApprover(
        subject, authorization::VIEW_EXECUTOR);
  } else {
    frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
    tasksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
    executorsApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
  }

  Future<Owned<AuthorizationAcceptor>> rolesAcceptor =
    AuthorizationAcceptor::create(
        principal,
        master->authorizer,
        authorization::VIEW_ROLE);

  return collect(
      frameworksApprover, tasksApprover, executorsApprover, rolesAcceptor)
    .then(defer(master->self(),
        [=](const tuple<Owned<ObjectApprover>,
                        Owned<ObjectApprover>,
                        Owned<ObjectApprover>,
                        Owned<AuthorizationAcceptor>>& approvers)
            -> Future<Response> {
          // Get approver from tuple.
          Owned<ObjectApprover> frameworksApprover;
          Owned<ObjectApprover> tasksApprover;
          Owned<ObjectApprover> executorsApprover;
          Owned<AuthorizationAcceptor> rolesAcceptor;
          tie(frameworksApprover,
              tasksApprover,
              executorsApprover,
              rolesAcceptor) = approvers;

          Pipe pipe;
          OK ok;

          ok.headers["Content-Type"] = stringify(contentType);
          ok.type = Response::PIPE;
          ok.reader = pipe.reader();

          HttpConnection http{pipe.writer(), contentType, UUID::random()};
          master->subscribe(http);

          mesos::master::Event event;
          event.set_type(mesos::master::Event::SUBSCRIBED);
          event.mutable_subscribed()->mutable_get_state()->CopyFrom(
              _getState(
                  frameworksApprover,
                  tasksApprover,
                  executorsApprover,
                  rolesAcceptor));

          http.send<mesos::master::Event, v1::master::Event>(event);

          return ok;
    }));
}


// TODO(ijimenez): Add some information or pointers to help
// users understand the HTTP Event/Call API.
string Master::Http::SCHEDULER_HELP()
{
  return HELP(
    TLDR(
        "Endpoint for schedulers to make calls against the master."),
    DESCRIPTION(
        "Returns 202 Accepted iff the request is accepted.",
        "",
        "Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
        "current master is not the leader.",
        "",
        "Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
        "found."),
    AUTHENTICATION(true),
    AUTHORIZATION(
        "The returned frameworks information might be filtered based on the",
        "users authorization.",
        "See the authorization documentation for details."));
}


Future<Response> Master::Http::scheduler(
    const Request& request,
    const Option<Principal>& principal) const
{
  // TODO(greggomann): Remove this check once the `Principal` type is used in
  // `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
  // See MESOS-7202.
  if (principal.isSome() && principal->value.isNone()) {
    return Forbidden(
        "The request's authenticated principal contains claims, but no value "
        "string. The master currently requires that principals have a value");
  }

  // TODO(vinod): Add metrics for rejected requests.

  // TODO(vinod): Add support for rate limiting.

  // When current master is not the leader, redirect to the leading master.
  // Note that this could happen if the scheduler realizes this is the
  // leading master before the master itself realizes it, e.g., due to
  // a ZooKeeper watch delay.
  if (!master->elected()) {
    return redirect(request);
  }

  CHECK_SOME(master->recovered);

  if (!master->recovered.get().isReady()) {
    return ServiceUnavailable("Master has not finished recovery");
  }

  if (request.method != "POST") {
    return MethodNotAllowed({"POST"}, request.method);
  }

  v1::scheduler::Call v1Call;

  // TODO(anand): Content type values are case-insensitive.
  Option<string> contentType = request.headers.get("Content-Type");

  if (contentType.isNone()) {
    return BadRequest("Expecting 'Content-Type' to be present");
  }

  if (contentType.get() == APPLICATION_PROTOBUF) {
    if (!v1Call.ParseFromString(request.body)) {
      return BadRequest("Failed to parse body into Call protobuf");
    }
  } else if (contentType.get() == APPLICATION_JSON) {
    Try<JSON::Value> value = JSON::parse(request.body);

    if (value.isError()) {
      return BadRequest("Failed to parse body into JSON: " + value.error());
    }

    Try<v1::scheduler::Call> parse =
      ::protobuf::parse<v1::scheduler::Call>(value.get());

    if (parse.isError()) {
      return BadRequest("Failed to convert JSON into Call protobuf: " +
                        parse.error());
    }

    v1Call = parse.get();
  } else {
    return UnsupportedMediaType(
        string("Expecting 'Content-Type' of ") +
        APPLICATION_JSON + " or " + APPLICATION_PROTOBUF);
  }

  scheduler::Call call = devolve(v1Call);

  Option<Error> error = validation::scheduler::call::validate(call, principal);

  if (error.isSome()) {
    return BadRequest("Failed to validate scheduler::Call: " +
                      error.get().message);
  }

  if (call.type() == scheduler::Call::SUBSCRIBE) {
    // We default to JSON 'Content-Type' in the response since an
    // empty 'Accept' header results in all media types considered
    // acceptable.
    ContentType acceptType = ContentType::JSON;

    if (request.acceptsMediaType(APPLICATION_JSON)) {
      acceptType = ContentType::JSON;
    } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
      acceptType = ContentType::PROTOBUF;
    } else {
      return NotAcceptable(
          string("Expecting 'Accept' to allow ") +
          "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
    }

    // Make sure that a stream ID was not included in the request headers.
    if (request.headers.contains("Mesos-Stream-Id")) {
      return BadRequest(
          "Subscribe calls should not include the 'Mesos-Stream-Id' header");
    }

    const FrameworkInfo& frameworkInfo = call.subscribe().framework_info();

    // We allow an authenticated framework to not specify a principal in
    // `FrameworkInfo`, but in that case we log a WARNING here. We also set
    // `FrameworkInfo.principal` to the value of the authenticated principal
    // and use it for authorization later.
    //
    // NOTE: Common validation code, called previously, verifies that the
    // authenticated principal is the same as `FrameworkInfo.principal`,
    // if present.
    if (principal.isSome() && !frameworkInfo.has_principal()) {
      CHECK_SOME(principal->value);

      LOG(WARNING)
        << "Setting 'principal' in FrameworkInfo to '" << principal->value.get()
        << "' because the framework authenticated with that principal but "
        << "did not set it in FrameworkInfo";

      call.mutable_subscribe()->mutable_framework_info()->set_principal(
          principal->value.get());
    }

    Pipe pipe;
    OK ok;
    ok.headers["Content-Type"] = stringify(acceptType);

    ok.type = Response::PIPE;
    ok.reader = pipe.reader();

    // Generate a stream ID and return it in the response.
    UUID streamId = UUID::random();
    ok.headers["Mesos-Stream-Id"] = streamId.toString();

    HttpConnection http {pipe.writer(), acceptType, streamId};
    master->subscribe(http, call.subscribe());

    return ok;
  }

  // We consolidate the framework lookup logic here because it is
  // common for all the call handlers.
  Framework* framework = master->getFramework(call.framework_id());

  if (framework == nullptr) {
    return BadRequest("Framework cannot be found");
  }

  // TODO(greggomann): Move this implicit scheduler authorization
  // into the authorizer. See MESOS-7399.
  if (principal.isSome() && principal != framework->info.principal()) {
    return BadRequest(
        "Authenticated principal '" + stringify(principal.get()) + "' does not "
        "match principal '" + framework->info.principal() + "' set in "
        "`FrameworkInfo`");
  }

  if (!framework->connected()) {
    return Forbidden("Framework is not subscribed");
  }

  if (framework->http.isNone()) {
    return Forbidden("Framework is not connected via HTTP");
  }

  // This isn't a `SUBSCRIBE` call, so the request should include a stream ID.
  if (!request.headers.contains("Mesos-Stream-Id")) {
    return BadRequest(
        "All non-subscribe calls should include the 'Mesos-Stream-Id' header");
  }

  const string& streamId = request.headers.at("Mesos-Stream-Id");
  if (streamId != framework->http.get().streamId.toString()) {
    return BadRequest(
        "The stream ID '" + streamId + "' included in this request "
        "didn't match the stream ID currently associated with framework ID "
        + framework->id().value());
  }

  switch (call.type()) {
    case scheduler::Call::SUBSCRIBE:
      // SUBSCRIBE call should have been handled above.
      LOG(FATAL) << "Unexpected 'SUBSCRIBE' call";

    case scheduler::Call::TEARDOWN:
      master->removeFramework(framework);
      return Accepted();

    case scheduler::Call::ACCEPT:
      master->accept(framework, call.accept());
      return Accepted();

    case scheduler::Call::DECLINE:
      master->decline(framework, call.decline());
      return Accepted();

    case scheduler::Call::ACCEPT_INVERSE_OFFERS:
      master->acceptInverseOffers(framework, call.accept_inverse_offers());
      return Accepted();

    case scheduler::Call::DECLINE_INVERSE_OFFERS:
      master->declineInverseOffers(framework, call.decline_inverse_offers());
      return Accepted();

    case scheduler::Call::REVIVE:
      master->revive(framework, call.revive());
      return Accepted();

    case scheduler::Call::SUPPRESS:
      master->suppress(framework, call.suppress());
      return Accepted();

    case scheduler::Call::KILL:
      master->kill(framework, call.kill());
      return Accepted();

    case scheduler::Call::SHUTDOWN:
      master->shutdown(framework, call.shutdown());
      return Accepted();

    case scheduler::Call::ACKNOWLEDGE:
      master->acknowledge(framework, call.acknowledge());
      return Accepted();

    case scheduler::Call::RECONCILE:
      master->reconcile(framework, call.reconcile());
      return Accepted();

    case scheduler::Call::MESSAGE:
      master->message(framework, call.message());
      return Accepted();

    case scheduler::Call::REQUEST:
      master->request(framework, call.request());
      return Accepted();

    case scheduler::Call::UNKNOWN:
      LOG(WARNING) << "Received 'UNKNOWN' call";
      return NotImplemented();
  }

  return NotImplemented();
}


static Resources removeDiskInfos(const Resources& resources)
{
  Resources result;

  foreach (Resource resource, resources) {
    resource.clear_disk();
    result += resource;
  }

  return result;
}


string Master::Http::CREATE_VOLUMES_HELP()
{
  return HELP(
    TLDR(
        "Create persistent volumes on reserved resources."),
    DESCRIPTION(
        "Returns 202 ACCEPTED which indicates that the create",
        "operation has been validated successfully by the master.",
        "",
        "Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
        "current master is not the leader.",
        "",
        "Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
        "found.",
        "",
        "The request is then forwarded asynchronously to the Mesos",
        "agent where the reserved resources are located.",
        "That asynchronous message may not be delivered or",
        "creating the volumes at the agent might fail.",
        "",
        "Please provide \"slaveId\" and \"volumes\" values describing",
        "the volumes to be created."),
    AUTHENTICATION(true),
    AUTHORIZATION(
        "Using this endpoint to create persistent volumes requires that",
        "the current principal is authorized to create volumes for the",
        "specific role.",
        "See the authorization documentation for details."));
}


Future<Response> Master::Http::createVolumes(
    const Request& request,
    const Option<Principal>& principal) const
{
  // TODO(greggomann): Remove this check once the `Principal` type is used in
  // `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
  // See MESOS-7202.
  if (principal.isSome() && principal->value.isNone()) {
    return Forbidden(
        "The request's authenticated principal contains claims, but no value "
        "string. The master currently requires that principals have a value");
  }

  // When current master is not the leader, redirect to the leading master.
  if (!master->elected()) {
    return redirect(request);
  }

  if (request.method != "POST") {
    return MethodNotAllowed({"POST"}, request.method);
  }

  // Parse the query string in the request body.
  Try<hashmap<string, string>> decode =
    process::http::query::decode(request.body);

  if (decode.isError()) {
    return BadRequest("Unable to decode query string: " + decode.error());
  }

  const hashmap<string, string>& values = decode.get();

  Option<string> value;

  value = values.get("slaveId");
  if (value.isNone()) {
    return BadRequest("Missing 'slaveId' query parameter in the request body");
  }

  SlaveID slaveId;
  slaveId.set_value(value.get());

  value = values.get("volumes");
  if (value.isNone()) {
    return BadRequest("Missing 'volumes' query parameter in the request body");
  }

  Try<JSON::Array> parse =
    JSON::parse<JSON::Array>(value.get());

  if (parse.isError()) {
    return BadRequest(
        "Error in parsing 'volumes' query parameter in the request body: " +
        parse.error());
  }

  RepeatedPtrField<Resource> volumes;
  foreach (const JSON::Value& value, parse.get().values) {
    Try<Resource> volume = ::protobuf::parse<Resource>(value);
    if (volume.isError()) {
      return BadRequest(
          "Error in parsing 'volumes' query parameter in the request body: " +
          volume.error());
    }

    volumes.Add()->CopyFrom(volume.get());
  }

  return _createVolumes(slaveId, volumes, principal);
}


Future<Response> Master::Http::_createVolumes(
    const SlaveID& slaveId,
    const RepeatedPtrField<Resource>& volumes,
    const Option<Principal>& principal) const
{
  Slave* slave = master->slaves.registered.get(slaveId);
  if (slave == nullptr) {
    return BadRequest("No agent found with specified ID");
  }

  // Create an offer operation.
  Offer::Operation operation;
  operation.set_type(Offer::Operation::CREATE);
  operation.mutable_create()->mutable_volumes()->CopyFrom(volumes);

  Option<Error> error = validateAndNormalizeResources(&operation);
  if (error.isSome()) {
    return BadRequest(error->message);
  }

  error = validation::operation::validate(
      operation.create(),
      slave->checkpointedResources,
      principal,
      slave->capabilities);

  if (error.isSome()) {
    return BadRequest(
        "Invalid CREATE operation on agent " + stringify(*slave) + ": " +
        error->message);
  }

  return master->authorizeCreateVolume(operation.create(), principal)
    .then(defer(master->self(), [=](bool authorized) -> Future<Response> {
      if (!authorized) {
        return Forbidden();
      }

      // The resources required for this operation are equivalent to the
      // volumes specified by the user minus any DiskInfo (DiskInfo will
      // be created when this operation is applied).
      return _operation(
          slaveId, removeDiskInfos(operation.create().volumes()), operation);
    }));
}


Future<Response> Master::Http::createVolumes(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType /*contentType*/) const
{
  // TODO(greggomann): Remove this check once the `Principal` type is used in
  // `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
  // See MESOS-7202.
  if (principal.isSome() && principal->value.isNone()) {
    return Forbidden(
        "The request's authenticated principal contains claims, but no value "
        "string. The master currently requires that principals have a value");
  }

  CHECK_EQ(mesos::master::Call::CREATE_VOLUMES, call.type());
  CHECK(call.has_create_volumes());

  const SlaveID& slaveId = call.create_volumes().slave_id();
  const RepeatedPtrField<Resource>& volumes = call.create_volumes().volumes();

  return _createVolumes(slaveId, volumes, principal);
}


string Master::Http::DESTROY_VOLUMES_HELP()
{
  return HELP(
    TLDR(
        "Destroy persistent volumes."),
    DESCRIPTION(
        "Returns 202 ACCEPTED which indicates that the destroy",
        "operation has been validated successfully by the master.",
        "",
        "Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
        "current master is not the leader.",
        "",
        "Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
        "found.",
        "",
        "The request is then forwarded asynchronously to the Mesos",
        "agent where the reserved resources are located.",
        "That asynchronous message may not be delivered or",
        "destroying the volumes at the agent might fail.",
        "",
        "Please provide \"slaveId\" and \"volumes\" values describing",
        "the volumes to be destroyed."),
    AUTHENTICATION(true),
    AUTHORIZATION(
        "Using this endpoint to destroy persistent volumes requires that",
        "the current principal is authorized to destroy volumes created",
        "by the principal who created the volume.",
        "See the authorization documentation for details."));
}


Future<Response> Master::Http::destroyVolumes(
    const Request& request,
    const Option<Principal>& principal) const
{
  // TODO(greggomann): Remove this check once the `Principal` type is used in
  // `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
  // See MESOS-7202.
  if (principal.isSome() && principal->value.isNone()) {
    return Forbidden(
        "The request's authenticated principal contains claims, but no value "
        "string. The master currently requires that principals have a value");
  }

  // When current master is not the leader, redirect to the leading master.
  if (!master->elected()) {
    return redirect(request);
  }

  if (request.method != "POST") {
    return MethodNotAllowed({"POST"}, request.method);
  }

  // Parse the query string in the request body.
  Try<hashmap<string, string>> decode =
    process::http::query::decode(request.body);

  if (decode.isError()) {
    return BadRequest("Unable to decode query string: " + decode.error());
  }

  const hashmap<string, string>& values = decode.get();

  Option<string> value;

  value = values.get("slaveId");
  if (value.isNone()) {
    return BadRequest("Missing 'slaveId' query parameter in the request body");
  }

  SlaveID slaveId;
  slaveId.set_value(value.get());

  value = values.get("volumes");
  if (value.isNone()) {
    return BadRequest("Missing 'volumes' query parameter in the request body");
  }

  Try<JSON::Array> parse =
    JSON::parse<JSON::Array>(value.get());

  if (parse.isError()) {
    return BadRequest(
        "Error in parsing 'volumes' query parameter in the request body: " +
        parse.error());
  }

  RepeatedPtrField<Resource> volumes;
  foreach (const JSON::Value& value, parse.get().values) {
    Try<Resource> volume = ::protobuf::parse<Resource>(value);
    if (volume.isError()) {
      return BadRequest(
          "Error in parsing 'volumes' query parameter in the request body: " +
          volume.error());
    }

    volumes.Add()->CopyFrom(volume.get());
  }

  return _destroyVolumes(slaveId, volumes, principal);
}


Future<Response> Master::Http::_destroyVolumes(
    const SlaveID& slaveId,
    const RepeatedPtrField<Resource>& volumes,
    const Option<Principal>& principal) const
{
  Slave* slave = master->slaves.registered.get(slaveId);
  if (slave == nullptr) {
    return BadRequest("No agent found with specified ID");
  }

  // Create an offer operation.
  Offer::Operation operation;
  operation.set_type(Offer::Operation::DESTROY);
  operation.mutable_destroy()->mutable_volumes()->CopyFrom(volumes);

  Option<Error> error = validateAndNormalizeResources(&operation);
  if (error.isSome()) {
    return BadRequest(error->message);
  }

  error = validation::operation::validate(
      operation.destroy(),
      slave->checkpointedResources,
      slave->usedResources,
      slave->pendingTasks);

  if (error.isSome()) {
    return BadRequest("Invalid DESTROY operation: " + error->message);
  }

  return master->authorizeDestroyVolume(operation.destroy(), principal)
    .then(defer(master->self(), [=](bool authorized) -> Future<Response> {
      if (!authorized) {
        return Forbidden();
      }

      return _operation(slaveId, operation.destroy().volumes(), operation);
    }));
}


Future<Response> Master::Http::destroyVolumes(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType /*contentType*/) const
{
  // TODO(greggomann): Remove this check once the `Principal` type is used in
  // `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
  // See MESOS-7202.
  if (principal.isSome() && principal->value.isNone()) {
    return Forbidden(
        "The request's authenticated principal contains claims, but no value "
        "string. The master currently requires that principals have a value");
  }

  CHECK_EQ(mesos::master::Call::DESTROY_VOLUMES, call.type());
  CHECK(call.has_destroy_volumes());

  const SlaveID& slaveId = call.destroy_volumes().slave_id();
  const RepeatedPtrField<Resource>& volumes = call.destroy_volumes().volumes();

  return _destroyVolumes(slaveId, volumes, principal);
}


string Master::Http::FRAMEWORKS_HELP()
{
  return HELP(
    TLDR("Exposes the frameworks info."),
    DESCRIPTION(
        "Returns 200 OK when the frameworks info was queried successfully.",
        "",
        "Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
        "current master is not the leader.",
        "",
        "Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
        "found.",
        "",
        "Query parameters:",
        ">        framework_id=VALUE   The ID of the framework returned "
        "(if no framework ID is specified, all frameworks will be returned)."),
    AUTHENTICATION(true),
    AUTHORIZATION(
        "This endpoint might be filtered based on the user accessing it.",
        "See the authorization documentation for details."));
}


Future<Response> Master::Http::frameworks(
    const Request& request,
    const Option<Principal>& principal) const
{
  // TODO(greggomann): Remove this check once the `Principal` type is used in
  // `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
  // See MESOS-7202.
  if (principal.isSome() && principal->value.isNone()) {
    return Forbidden(
        "The request's authenticated principal contains claims, but no value "
        "string. The master currently requires that principals have a value");
  }

  // When current master is not the leader, redirect to the leading master.
  if (!master->elected()) {
    return redirect(request);
  }

  Future<Owned<AuthorizationAcceptor>> authorizeFrameworkInfo =
    AuthorizationAcceptor::create(
        principal, master->authorizer, authorization::VIEW_FRAMEWORK);
  Future<Owned<AuthorizationAcceptor>> authorizeTask =
    AuthorizationAcceptor::create(
        principal, master->authorizer, authorization::VIEW_TASK);
  Future<Owned<AuthorizationAcceptor>> authorizeExecutorInfo =
    AuthorizationAcceptor::create(
        principal, master->authorizer, authorization::VIEW_EXECUTOR);
  Future<IDAcceptor<FrameworkID>> selectFrameworkId =
    IDAcceptor<FrameworkID>(request.url.query.get("framework_id"));

  return collect(
      authorizeFrameworkInfo,
      authorizeTask,
      authorizeExecutorInfo,
      selectFrameworkId)
    .then(defer(master->self(),
        [this, request](const tuple<Owned<AuthorizationAcceptor>,
                                    Owned<AuthorizationAcceptor>,
                                    Owned<AuthorizationAcceptor>,
                                    IDAcceptor<FrameworkID>>& acceptors)
          -> Response {
      // This lambda is consumed before the outer lambda
      // returns, hence capture by reference is fine here.
      auto frameworks = [this, &acceptors](JSON::ObjectWriter* writer) {
        Owned<AuthorizationAcceptor> authorizeFrameworkInfo;
        Owned<AuthorizationAcceptor> authorizeTask;
        Owned<AuthorizationAcceptor> authorizeExecutorInfo;
        IDAcceptor<FrameworkID> selectFrameworkId;
        tie(authorizeFrameworkInfo,
            authorizeTask,
            authorizeExecutorInfo,
            selectFrameworkId) = acceptors;

        // Model all of the frameworks.
        writer->field(
            "frameworks",
            [this,
             &authorizeFrameworkInfo,
             &authorizeTask,
             &authorizeExecutorInfo,
             &selectFrameworkId](JSON::ArrayWriter* writer) {
          foreachvalue (Framework* framework, master->frameworks.registered) {
            // Skip unauthorized frameworks or frameworks without a matching ID.
            if (!selectFrameworkId.accept(framework->id()) ||
                !authorizeFrameworkInfo->accept(framework->info)) {
              continue;
            }

            FullFrameworkWriter frameworkWriter(
                authorizeTask,
                authorizeExecutorInfo,
                framework);

            writer->element(frameworkWriter);
          }
        });

        // Model all of the completed frameworks.
        writer->field(
            "completed_frameworks",
            [this,
             &authorizeFrameworkInfo,
             &authorizeTask,
             &authorizeExecutorInfo,
             &selectFrameworkId](JSON::ArrayWriter* writer) {
          foreachvalue (const Owned<Framework>& framework,
                        master->frameworks.completed) {
            // Skip unauthorized frameworks or frameworks without a matching ID.
            if (!selectFrameworkId.accept(framework->id()) ||
                !authorizeFrameworkInfo->accept(framework->info)) {
              continue;
            }

            FullFrameworkWriter frameworkWriter(
                authorizeTask,
                authorizeExecutorInfo,
                framework.get());

            writer->element(frameworkWriter);
          }
        });

        // Unregistered frameworks are no longer possible. We emit an
        // empty array for the sake of backward compatibility.
        writer->field("unregistered_frameworks", [](JSON::ArrayWriter*) {});
      };

      return OK(jsonify(frameworks), request.url.query.get("jsonp"));
  }));
}


mesos::master::Response::GetFrameworks::Framework model(
    const Framework& framework)
{
  mesos::master::Response::GetFrameworks::Framework _framework;

  _framework.mutable_framework_info()->CopyFrom(framework.info);

  _framework.set_active(framework.active());
  _framework.set_connected(framework.connected());
  _framework.set_recovered(framework.recovered());

  int64_t time = framework.registeredTime.duration().ns();
  if (time != 0) {
    _framework.mutable_registered_time()->set_nanoseconds(time);
  }

  time = framework.unregisteredTime.duration().ns();
  if (time != 0) {
    _framework.mutable_unregistered_time()->set_nanoseconds(time);
  }

  time = framework.reregisteredTime.duration().ns();
  if (time != 0) {
    _framework.mutable_reregistered_time()->set_nanoseconds(time);
  }

  foreach (const Offer* offer, framework.offers) {
    _framework.mutable_offers()->Add()->CopyFrom(*offer);
  }

  foreach (const InverseOffer* offer, framework.inverseOffers) {
    _framework.mutable_inverse_offers()->Add()->CopyFrom(*offer);
  }

  foreach (Resource resource, framework.totalUsedResources) {
    convertResourceFormat(&resource, ENDPOINT);

    _framework.mutable_allocated_resources()->Add()->CopyFrom(resource);
  }

  foreach (Resource resource, framework.totalOfferedResources) {
    convertResourceFormat(&resource, ENDPOINT);

    _framework.mutable_offered_resources()->Add()->CopyFrom(resource);
  }

  return _framework;
}


Future<Response> Master::Http::getFrameworks(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType contentType) const
{
  CHECK_EQ(mesos::master::Call::GET_FRAMEWORKS, call.type());

  // Retrieve `ObjectApprover`s for authorizing frameworks.
  Future<Owned<ObjectApprover>> frameworksApprover;

  if (master->authorizer.isSome()) {
    Option<authorization::Subject> subject = createSubject(principal);

    frameworksApprover = master->authorizer.get()->getObjectApprover(
        subject, authorization::VIEW_FRAMEWORK);
  } else {
    frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
  }

  return frameworksApprover
    .then(defer(master->self(),
        [=](const Owned<ObjectApprover>& frameworksApprover)
          -> Future<Response> {
      mesos::master::Response response;
      response.set_type(mesos::master::Response::GET_FRAMEWORKS);
      response.mutable_get_frameworks()->CopyFrom(
          _getFrameworks(frameworksApprover));

      return OK(serialize(contentType, evolve(response)),
                stringify(contentType));
    }));
}


mesos::master::Response::GetFrameworks Master::Http::_getFrameworks(
    const Owned<ObjectApprover>& frameworksApprover) const
{
  mesos::master::Response::GetFrameworks getFrameworks;
  foreachvalue (const Framework* framework,
                master->frameworks.registered) {
    // Skip unauthorized frameworks.
    if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
      continue;
    }

    getFrameworks.add_frameworks()->CopyFrom(model(*framework));
  }

  foreachvalue (const Owned<Framework>& framework,
                master->frameworks.completed) {
    // Skip unauthorized frameworks.
    if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
      continue;
    }

    getFrameworks.add_completed_frameworks()->CopyFrom(model(*framework.get()));
  }

  return getFrameworks;
}


Future<Response> Master::Http::getExecutors(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType contentType) const
{
  CHECK_EQ(mesos::master::Call::GET_EXECUTORS, call.type());

  // Retrieve `ObjectApprover`s for authorizing frameworks and executors.
  Future<Owned<ObjectApprover>> frameworksApprover;
  Future<Owned<ObjectApprover>> executorsApprover;
  if (master->authorizer.isSome()) {
    Option<authorization::Subject> subject = createSubject(principal);

    frameworksApprover = master->authorizer.get()->getObjectApprover(
        subject, authorization::VIEW_FRAMEWORK);

    executorsApprover = master->authorizer.get()->getObjectApprover(
        subject, authorization::VIEW_EXECUTOR);
  } else {
    frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
    executorsApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
  }

  return collect(frameworksApprover, executorsApprover)
    .then(defer(master->self(),
        [=](const tuple<Owned<ObjectApprover>,
                        Owned<ObjectApprover>>& approvers)
          -> Future<Response> {
      // Get approver from tuple.
      Owned<ObjectApprover> frameworksApprover;
      Owned<ObjectApprover> executorsApprover;
      tie(frameworksApprover, executorsApprover) = approvers;

      mesos::master::Response response;
      response.set_type(mesos::master::Response::GET_EXECUTORS);

      response.mutable_get_executors()->CopyFrom(
          _getExecutors(frameworksApprover, executorsApprover));

      return OK(serialize(contentType, evolve(response)),
                stringify(contentType));
    }));
}


mesos::master::Response::GetExecutors Master::Http::_getExecutors(
      const Owned<ObjectApprover>& frameworksApprover,
      const Owned<ObjectApprover>& executorsApprover) const
{
  // Construct framework list with both active and completed frameworks.
  vector<const Framework*> frameworks;
  foreachvalue (Framework* framework, master->frameworks.registered) {
    // Skip unauthorized frameworks.
    if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
      continue;
    }

    frameworks.push_back(framework);
  }

  foreachvalue (const Owned<Framework>& framework,
                master->frameworks.completed) {
    // Skip unauthorized frameworks.
    if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
      continue;
    }

    frameworks.push_back(framework.get());
  }

  mesos::master::Response::GetExecutors getExecutors;

  foreach (const Framework* framework, frameworks) {
    foreachpair (const SlaveID& slaveId,
                 const auto& executorsMap,
                 framework->executors) {
      foreachvalue (const ExecutorInfo& executorInfo, executorsMap) {
        // Skip unauthorized executors.
        if (!approveViewExecutorInfo(executorsApprover,
                                     executorInfo,
                                     framework->info)) {
          continue;
        }

        mesos::master::Response::GetExecutors::Executor* executor =
          getExecutors.add_executors();

        executor->mutable_executor_info()->CopyFrom(executorInfo);
        executor->mutable_slave_id()->CopyFrom(slaveId);
      }
    }
  }

  return getExecutors;
}


Future<Response> Master::Http::getState(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType contentType) const
{
  CHECK_EQ(mesos::master::Call::GET_STATE, call.type());

  // Retrieve Approvers for authorizing frameworks and tasks.
  Future<Owned<ObjectApprover>> frameworksApprover;
  Future<Owned<ObjectApprover>> tasksApprover;
  Future<Owned<ObjectApprover>> executorsApprover;
  if (master->authorizer.isSome()) {
    Option<authorization::Subject> subject = createSubject(principal);

    frameworksApprover = master->authorizer.get()->getObjectApprover(
        subject, authorization::VIEW_FRAMEWORK);

    tasksApprover = master->authorizer.get()->getObjectApprover(
        subject, authorization::VIEW_TASK);

    executorsApprover = master->authorizer.get()->getObjectApprover(
        subject, authorization::VIEW_EXECUTOR);
  } else {
    frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
    tasksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
    executorsApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
  }

  Future<Owned<AuthorizationAcceptor>> rolesAcceptor =
    AuthorizationAcceptor::create(
        principal,
        master->authorizer,
        authorization::VIEW_ROLE);

  return collect(
      frameworksApprover, tasksApprover, executorsApprover, rolesAcceptor)
    .then(defer(master->self(),
        [=](const tuple<Owned<ObjectApprover>,
                        Owned<ObjectApprover>,
                        Owned<ObjectApprover>,
                        Owned<AuthorizationAcceptor>>& approvers)
            -> Future<Response> {
          // Get approver from tuple.
          Owned<ObjectApprover> frameworksApprover;
          Owned<ObjectApprover> tasksApprover;
          Owned<ObjectApprover> executorsApprover;
          Owned<AuthorizationAcceptor> rolesAcceptor;
          tie(frameworksApprover,
              tasksApprover,
              executorsApprover,
              rolesAcceptor) = approvers;

          mesos::master::Response response;
          response.set_type(mesos::master::Response::GET_STATE);
          response.mutable_get_state()->CopyFrom(
              _getState(
                  frameworksApprover,
                  tasksApprover,
                  executorsApprover,
                  rolesAcceptor));

          return OK(
              serialize(contentType, evolve(response)), stringify(contentType));
    }));
}


mesos::master::Response::GetState Master::Http::_getState(
    const Owned<ObjectApprover>& frameworksApprover,
    const Owned<ObjectApprover>& tasksApprover,
    const Owned<ObjectApprover>& executorsApprover,
    const Owned<AuthorizationAcceptor>& rolesAcceptor) const
{
  // NOTE: This function must be blocking instead of returning a
  // `Future`. This is because `subscribe()` needs to atomically
  // add subscriber to `subscribers` map and send the captured state
  // in `SUBSCRIBED` without being interleaved by any other events.

  mesos::master::Response::GetState getState;

  getState.mutable_get_tasks()->CopyFrom(
      _getTasks(frameworksApprover, tasksApprover));

  getState.mutable_get_executors()->CopyFrom(
      _getExecutors(frameworksApprover, executorsApprover));

  getState.mutable_get_frameworks()->CopyFrom(
      _getFrameworks(frameworksApprover));

  getState.mutable_get_agents()->CopyFrom(_getAgents(rolesAcceptor));

  return getState;
}


class Master::Http::FlagsError : public Error
{
public:
  enum Type
  {
    UNAUTHORIZED
  };

  // TODO(arojas): Provide a proper string representation of the enum.
  explicit FlagsError(Type _type)
    : Error(stringify(_type)), type(_type) {}

  FlagsError(Type _type, const string& _message)
    : Error(stringify(_type)), type(_type), message(_message) {}

  const Type type;
  const string message;
};


string Master::Http::FLAGS_HELP()
{
  return HELP(
    TLDR("Exposes the master's flag configuration."),
    None(),
    AUTHENTICATION(true),
    AUTHORIZATION(
        "Querying this endpoint requires that the current principal",
        "is authorized to view all flags.",
        "See the authorization documentation for details."));
}


Future<Response> Master::Http::flags(
    const Request& request,
    const Option<Principal>& principal) const
{
  // TODO(greggomann): Remove this check once the `Principal` type is used in
  // `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
  // See MESOS-7202.
  if (principal.isSome() && principal->value.isNone()) {
    return Forbidden(
        "The request's authenticated principal contains claims, but no value "
        "string. The master currently requires that principals have a value");
  }

  // TODO(nfnt): Remove check for enabled
  // authorization as part of MESOS-5346.
  if (request.method != "GET" && master->authorizer.isSome()) {
    return MethodNotAllowed({"GET"}, request.method);
  }

  Option<string> jsonp = request.url.query.get("jsonp");

  return _flags(principal)
      .then([jsonp](const Try<JSON::Object, FlagsError>& flags)
            -> Future<Response> {
        if (flags.isError()) {
          switch (flags.error().type) {
            case FlagsError::Type::UNAUTHORIZED:
              return Forbidden();
          }

          return InternalServerError(flags.error().message);
        }

        return OK(flags.get(), jsonp);
      });
}


Future<Try<JSON::Object, Master::Http::FlagsError>> Master::Http::_flags(
    const Option<Principal>& principal) const
{
  if (master->authorizer.isNone()) {
    return __flags();
  }

  authorization::Request authRequest;
  authRequest.set_action(authorization::VIEW_FLAGS);

  Option<authorization::Subject> subject = createSubject(principal);
  if (subject.isSome()) {
    authRequest.mutable_subject()->CopyFrom(subject.get());
  }

  return master->authorizer.get()->authorized(authRequest)
      .then(defer(
          master->self(),
          [this](bool authorized) -> Future<Try<JSON::Object, FlagsError>> {
        if (authorized) {
          return __flags();
        } else {
          return FlagsError(FlagsError::Type::UNAUTHORIZED);
        }
      }));
}


JSON::Object Master::Http::__flags() const
{
  JSON::Object object;

  {
    JSON::Object flags;
    foreachvalue (const flags::Flag& flag, master->flags) {
      Option<string> value = flag.stringify(master->flags);
      if (value.isSome()) {
        flags.values[flag.effective_name().value] = value.get();
      }
    }
    object.values["flags"] = std::move(flags);
  }

  return object;
}


Future<Response> Master::Http::getFlags(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType contentType) const
{
  CHECK_EQ(mesos::master::Call::GET_FLAGS, call.type());

  return _flags(principal)
      .then([contentType](const Try<JSON::Object, FlagsError>& flags)
            -> Future<Response> {
        if (flags.isError()) {
          switch (flags.error().type) {
            case FlagsError::Type::UNAUTHORIZED:
              return Forbidden();
          }

          return InternalServerError(flags.error().message);
        }

        return OK(
            serialize(contentType,
                      evolve<v1::master::Response::GET_FLAGS>(flags.get())),
            stringify(contentType));
      });
}


string Master::Http::HEALTH_HELP()
{
  return HELP(
    TLDR(
        "Health status of the Master."),
    DESCRIPTION(
        "Returns 200 OK iff the Master is healthy.",
        "Delayed responses are also indicative of poor health."),
    AUTHENTICATION(false));
}


Future<Response> Master::Http::health(const Request& request) const
{
  return OK();
}


Future<Response> Master::Http::getHealth(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType contentType) const
{
  CHECK_EQ(mesos::master::Call::GET_HEALTH, call.type());

  mesos::master::Response response;
  response.set_type(mesos::master::Response::GET_HEALTH);
  response.mutable_get_health()->set_healthy(true);

  return OK(serialize(contentType, evolve(response)),
            stringify(contentType));
}


Future<Response> Master::Http::getVersion(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType contentType) const
{
  CHECK_EQ(mesos::master::Call::GET_VERSION, call.type());

  return OK(serialize(contentType,
                      evolve<v1::master::Response::GET_VERSION>(version())),
            stringify(contentType));
}


Future<Response> Master::Http::getMetrics(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType contentType) const
{
  CHECK_EQ(mesos::master::Call::GET_METRICS, call.type());
  CHECK(call.has_get_metrics());

  Option<Duration> timeout;
  if (call.get_metrics().has_timeout()) {
    timeout = Nanoseconds(call.get_metrics().timeout().nanoseconds());
  }

  return process::metrics::snapshot(timeout)
      .then([contentType](const hashmap<string, double>& metrics) -> Response {
        mesos::master::Response response;
        response.set_type(mesos::master::Response::GET_METRICS);
        mesos::master::Response::GetMetrics* _getMetrics =
          response.mutable_get_metrics();

        foreachpair (const string& key, double value, metrics) {
          Metric* metric = _getMetrics->add_metrics();
          metric->set_name(key);
          metric->set_value(value);
        }

        return OK(serialize(contentType, evolve(response)),
                  stringify(contentType));
      });
}


Future<Response> Master::Http::getLoggingLevel(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType contentType) const
{
  CHECK_EQ(mesos::master::Call::GET_LOGGING_LEVEL, call.type());

  mesos::master::Response response;
  response.set_type(mesos::master::Response::GET_LOGGING_LEVEL);
  response.mutable_get_logging_level()->set_level(FLAGS_v);

  return OK(serialize(contentType, evolve(response)),
            stringify(contentType));
}


Future<Response> Master::Http::setLoggingLevel(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType /*contentType*/) const
{
  CHECK_EQ(mesos::master::Call::SET_LOGGING_LEVEL, call.type());
  CHECK(call.has_set_logging_level());

  uint32_t level = call.set_logging_level().level();
  Duration duration =
    Nanoseconds(call.set_logging_level().duration().nanoseconds());

  Future<Owned<ObjectApprover>> approver;

  if (master->authorizer.isSome()) {
    Option<authorization::Subject> subject = createSubject(principal);

    approver = master->authorizer.get()->getObjectApprover(
        subject, authorization::SET_LOG_LEVEL);
  } else {
    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
  }

  return approver.then([level, duration](const Owned<ObjectApprover>& approver)
      -> Future<Response> {
    Try<bool> approved = approver->approved((ObjectApprover::Object()));

    if (approved.isError()) {
      return InternalServerError("Authorization error: " + approved.error());
    } else if (!approved.get()) {
      return Forbidden();
    }

    return dispatch(process::logging(), &Logging::set_level, level, duration)
        .then([]() -> Response {
          return OK();
        });
  });
}


Future<Response> Master::Http::getMaster(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType contentType) const
{
  CHECK_EQ(mesos::master::Call::GET_MASTER, call.type());

  mesos::master::Response response;
  response.set_type(mesos::master::Response::GET_MASTER);

  // It is guaranteed that this master has been elected as the leader.
  CHECK(master->elected());

  response.mutable_get_master()->mutable_master_info()->CopyFrom(
      master->info());

  return OK(serialize(contentType, evolve(response)),
            stringify(contentType));
}


string Master::Http::REDIRECT_HELP()
{
  return HELP(
    TLDR(
        "Redirects to the leading Master."),
    DESCRIPTION(
        "Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
        "current master is not the leader.",
        "",
        "Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
        "found.",
        "",
        "**NOTES:**",
        "1. This is the recommended way to bookmark the WebUI when "
        "running multiple Masters.",
        "2. This is broken currently \"on the cloud\" (e.g., EC2) as "
        "this will attempt to redirect to the private IP address, unless "
        "`advertise_ip` points to an externally accessible IP"),
    AUTHENTICATION(false));
}


Future<Response> Master::Http::redirect(const Request& request) const
{
  // If there's no leader, return `ServiceUnavailable`.
  if (master->leader.isNone()) {
    LOG(WARNING) << "Current master is not elected as leader, and leader "
                 << "information is unavailable. Failed to redirect the "
                 << "request url: " << request.url;
    return ServiceUnavailable("No leader elected");
  }

  MasterInfo info = master->leader.get();

  // NOTE: Currently, 'info.ip()' stores ip in network order, which
  // should be fixed. See MESOS-1201 for details.
  Try<string> hostname = info.has_hostname()
    ? info.hostname()
    : net::getHostname(net::IP(ntohl(info.ip())));

  if (hostname.isError()) {
    return InternalServerError(hostname.error());
  }

  LOG(INFO) << "Redirecting request for " << request.url
            << " to the leading master " << hostname.get();

  // NOTE: We can use a protocol-relative URL here in order to allow
  // the browser (or other HTTP client) to prefix with 'http:' or
  // 'https:' depending on the original request. See
  // https://tools.ietf.org/html/rfc7231#section-7.1.2 as well as
  // http://stackoverflow.com/questions/12436669/using-protocol-relative-uris-within-location-headers
  // which discusses this.
  string basePath = "//" + hostname.get() + ":" + stringify(info.port());

  string redirectPath = "/redirect";
  string masterRedirectPath = "/" + master->self().id + "/redirect";

  if (request.url.path == redirectPath ||
      request.url.path == masterRedirectPath) {
    // When request url is '/redirect' or '/master/redirect', redirect to the
    // base url of leading master to avoid infinite redirect loop.
    return TemporaryRedirect(basePath);
  } else if (strings::startsWith(request.url.path, redirectPath + "/") ||
             strings::startsWith(request.url.path, masterRedirectPath + "/")) {
    // Prevent redirection loop.
    return NotFound();
  } else {
    // `request.url` is not absolute so we can safely append it to
    // `basePath`. See https://tools.ietf.org/html/rfc2616#section-5.1.2
    // for details.
    CHECK(!request.url.isAbsolute());
    return TemporaryRedirect(basePath + stringify(request.url));
  }
}


string Master::Http::RESERVE_HELP()
{
  return HELP(
    TLDR(
        "Reserve resources dynamically on a specific agent."),
    DESCRIPTION(
        "Returns 202 ACCEPTED which indicates that the reserve",
        "operation has been validated successfully by the master.",
        "",
        "Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
        "current master is not the leader.",
        "",
        "Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
        "found.",
        "",
        "The request is then forwarded asynchronously to the Mesos",
        "agent where the reserved resources are located.",
        "That asynchronous message may not be delivered or",
        "reserving resources at the agent might fail.",
        "",
        "Please provide \"slaveId\" and \"resources\" values describing",
        "the resources to be reserved."),
    AUTHENTICATION(true),
    AUTHORIZATION(
        "Using this endpoint to reserve resources requires that the",
        "current principal is authorized to reserve resources for the",
        "specific role.",
        "See the authorization documentation for details."));
}


Future<Response> Master::Http::reserve(
    const Request& request,
    const Option<Principal>& principal) const
{
  // TODO(greggomann): Remove this check once the `Principal` type is used in
  // `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
  // See MESOS-7202.
  if (principal.isSome() && principal->value.isNone()) {
    return Forbidden(
        "The request's authenticated principal contains claims, but no value "
        "string. The master currently requires that principals have a value");
  }

  // When current master is not the leader, redirect to the leading master.
  if (!master->elected()) {
    return redirect(request);
  }

  if (request.method != "POST") {
    return MethodNotAllowed({"POST"}, request.method);
  }

  // Parse the query string in the request body.
  Try<hashmap<string, string>> decode =
    process::http::query::decode(request.body);

  if (decode.isError()) {
    return BadRequest("Unable to decode query string: " + decode.error());
  }

  const hashmap<string, string>& values = decode.get();

  Option<string> value;

  value = values.get("slaveId");
  if (value.isNone()) {
    return BadRequest("Missing 'slaveId' query parameter in the request body");
  }

  SlaveID slaveId;
  slaveId.set_value(value.get());

  value = values.get("resources");
  if (value.isNone()) {
    return BadRequest(
        "Missing 'resources' query parameter in the request body");
  }

  Try<JSON::Array> parse =
    JSON::parse<JSON::Array>(value.get());

  if (parse.isError()) {
    return BadRequest(
        "Error in parsing 'resources' query parameter in the request body: " +
        parse.error());
  }

  RepeatedPtrField<Resource> resources;
  foreach (const JSON::Value& value, parse.get().values) {
    Try<Resource> resource = ::protobuf::parse<Resource>(value);
    if (resource.isError()) {
      return BadRequest(
          "Error in parsing 'resources' query parameter in the request body: " +
          resource.error());
    }

    resources.Add()->CopyFrom(resource.get());
  }

  return _reserve(slaveId, resources, principal);
}


Future<Response> Master::Http::_reserve(
    const SlaveID& slaveId,
    const RepeatedPtrField<Resource>& resources,
    const Option<Principal>& principal) const
{
  Slave* slave = master->slaves.registered.get(slaveId);
  if (slave == nullptr) {
    return BadRequest("No agent found with specified ID");
  }

  // Create an offer operation.
  Offer::Operation operation;
  operation.set_type(Offer::Operation::RESERVE);
  operation.mutable_reserve()->mutable_resources()->CopyFrom(resources);

  Option<Error> error = validateAndNormalizeResources(&operation);
  if (error.isSome()) {
    return BadRequest(error->message);
  }

  error = validation::operation::validate(
      operation.reserve(), principal, slave->capabilities);

  if (error.isSome()) {
    return BadRequest(
        "Invalid RESERVE operation on agent " + stringify(*slave) + ": " +
        error->message);
  }

  return master->authorizeReserveResources(operation.reserve(), principal)
    .then(defer(master->self(), [=](bool authorized) -> Future<Response> {
      if (!authorized) {
        return Forbidden();
      }

      // We only allow "pushing" a single reservation at a time, so we require
      // the resources with one reservation "popped" to be present on the agent.
      Resources required =
        Resources(operation.reserve().resources()).popReservation();

      return _operation(slaveId, required, operation);
    }));
}


Future<Response> Master::Http::reserveResources(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType contentType) const
{
  CHECK_EQ(mesos::master::Call::RESERVE_RESOURCES, call.type());

  const SlaveID& slaveId = call.reserve_resources().slave_id();
  const RepeatedPtrField<Resource>& resources =
    call.reserve_resources().resources();

  return _reserve(slaveId, resources, principal);
}


string Master::Http::SLAVES_HELP()
{
  return HELP(
    TLDR(
        "Information about agents."),
    DESCRIPTION(
        "Returns 200 OK when the request was processed successfully.",
        "",
        "Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
        "current master is not the leader.",
        "",
        "Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
        "found.",
        "",
        "This endpoint shows information about the agents which are registered",
        "in this master or recovered from registry, formatted as a JSON",
        "object.",
        "",
        "Query parameters:",
        ">        slave_id=VALUE       The ID of the slave returned "
        "(when no slave_id is specified, all slaves will be returned)."),
    AUTHENTICATION(true));
}


Future<Response> Master::Http::slaves(
    const Request& request,
    const Option<Principal>& principal) const
{
  // When current master is not the leader, redirect to the leading master.
  if (!master->elected()) {
    return redirect(request);
  }

  Future<Owned<AuthorizationAcceptor>> authorizeRole =
    AuthorizationAcceptor::create(
        principal, master->authorizer, authorization::VIEW_ROLE);
  Future<IDAcceptor<SlaveID>> selectSlaveId =
    IDAcceptor<SlaveID>(request.url.query.get("slave_id"));

  Master* master = this->master;
  Option<string> jsonp = request.url.query.get("jsonp");

  return collect(authorizeRole, selectSlaveId)
    .then(defer(master->self(),
        [master, jsonp](const tuple<Owned<AuthorizationAcceptor>,
                                    IDAcceptor<SlaveID>>& acceptors)
          -> Future<Response> {
      Owned<AuthorizationAcceptor> authorizeRole;
      IDAcceptor<SlaveID> selectSlaveId;
      tie(authorizeRole, selectSlaveId) = acceptors;

      return OK(
          jsonify(SlavesWriter(master->slaves, authorizeRole, selectSlaveId)),
          jsonp);
  }));
}


Future<process::http::Response> Master::Http::getAgents(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType contentType) const
{
  CHECK_EQ(mesos::master::Call::GET_AGENTS, call.type());

  return AuthorizationAcceptor::create(
      principal,
      master->authorizer,
      authorization::VIEW_ROLE)
    .then(defer(master->self(),
        [=](const Owned<AuthorizationAcceptor>& rolesAcceptor)
            -> Future<process::http::Response> {
          mesos::master::Response response;
          response.set_type(mesos::master::Response::GET_AGENTS);
          response.mutable_get_agents()->CopyFrom(_getAgents(rolesAcceptor));

          return OK(serialize(contentType, evolve(response)),
                    stringify(contentType));
    }));
}


mesos::master::Response::GetAgents Master::Http::_getAgents(
    const Owned<AuthorizationAcceptor>& rolesAcceptor) const
{
  mesos::master::Response::GetAgents getAgents;
  foreachvalue (const Slave* slave, master->slaves.registered) {
    mesos::master::Response::GetAgents::Agent* agent = getAgents.add_agents();
    agent->CopyFrom(
        protobuf::master::event::createAgentResponse(*slave, rolesAcceptor));
  }

  foreachvalue (const SlaveInfo& slaveInfo, master->slaves.recovered) {
    SlaveInfo* agent = getAgents.add_recovered_agents();
    agent->CopyFrom(slaveInfo);
    agent->clear_resources();
    foreach (const Resource& resource, slaveInfo.resources()) {
      if (authorizeResource(resource, rolesAcceptor)) {
        agent->add_resources()->CopyFrom(resource);
      }
    }
  }

  return getAgents;
}


string Master::Http::QUOTA_HELP()
{
  return HELP(
    TLDR(
        "Gets or updates quota for roles."),
    DESCRIPTION(
        "Returns 200 OK when the quota was queried or updated successfully.",
        "",
        "Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
        "current master is not the leader.",
        "",
        "Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
        "found.",
        "",
        "GET: Returns the currently set quotas as JSON.",
        "",
        "POST: Validates the request body as JSON",
        " and sets quota for a role.",
        "",
        "DELETE: Validates the request body as JSON",
        " and removes quota for a role."),
    AUTHENTICATION(true),
    AUTHORIZATION(
        "Using this endpoint to set a quota for a certain role requires that",
        "the current principal is authorized to set quota for the target role.",
        "Similarly, removing quota requires that the principal is authorized",
        "to remove quota created by the quota_principal.",
        "Getting quota information for a certain role requires that the",
        "current principal is authorized to get quota for the target role,",
        "otherwise the entry for the target role could be silently filtered.",
        "See the authorization documentation for details."));
}


Future<Response> Master::Http::quota(
    const Request& request,
    const Option<Principal>& principal) const
{
  // TODO(greggomann): Remove this check once the `Principal` type is used in
  // `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
  // See MESOS-7202.
  if (principal.isSome() && principal->value.isNone()) {
    return Forbidden(
        "The request's authenticated principal contains claims, but no value "
        "string. The master currently requires that principals have a value");
  }

  // When current master is not the leader, redirect to the leading master.
  if (!master->elected()) {
    return redirect(request);
  }

  // Dispatch based on HTTP method to separate `QuotaHandler`.
  if (request.method == "GET") {
    return quotaHandler.status(request, principal);
  }

  if (request.method == "POST") {
    return quotaHandler.set(request, principal);
  }

  if (request.method == "DELETE") {
    return quotaHandler.remove(request, principal);
  }

  // TODO(joerg84): Add update logic for PUT requests
  // once Quota supports updates.

  return MethodNotAllowed({"GET", "POST", "DELETE"}, request.method);
}


string Master::Http::WEIGHTS_HELP()
{
  return HELP(
    TLDR(
        "Updates weights for the specified roles."),
    DESCRIPTION(
        "Returns 200 OK when the weights update was successful.",
        "",
        "Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
        "current master is not the leader.",
        "",
        "Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
        "found.",
        "",
        "PUT: Validates the request body as JSON",
        "and updates the weights for the specified roles."),
    AUTHENTICATION(true),
    AUTHORIZATION(
        "Getting weight information for a role requires that the current",
        "principal is authorized to get weights for the target role,",
        "otherwise the entry for the target role could be silently filtered.",
        "See the authorization documentation for details."));
}


Future<Response> Master::Http::weights(
    const Request& request,
    const Option<Principal>& principal) const
{
  // TODO(greggomann): Remove this check once the `Principal` type is used in
  // `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
  // See MESOS-7202.
  if (principal.isSome() && principal->value.isNone()) {
    return Forbidden(
        "The request's authenticated principal contains claims, but no value "
        "string. The master currently requires that principals have a value");
  }

  // When current master is not the leader, redirect to the leading master.
  if (!master->elected()) {
    return redirect(request);
  }

  if (request.method == "GET") {
    return weightsHandler.get(request, principal);
  }

  // Dispatch based on HTTP method to separate `WeightsHandler`.
  if (request.method == "PUT") {
    return weightsHandler.update(request, principal);
  }

  return MethodNotAllowed({"GET", "PUT"}, request.method);
}


string Master::Http::STATE_HELP()
{
  return HELP(
    TLDR(
        "Information about state of master."),
    DESCRIPTION(
        "Returns 200 OK when the state of the master was queried successfully.",
        "",
        "Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
        "current master is not the leader.",
        "",
        "Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
        "found.",
        "",
        "This endpoint shows information about the frameworks, tasks,",
        "executors, and agents running in the cluster as a JSON object.",
        "The information shown might be filtered based on the user",
        "accessing the endpoint.",
        "",
        "Example (**Note**: this is not exhaustive):",
        "",
        "```",
        "{",
        "    \"version\" : \"0.28.0\",",
        "    \"git_sha\" : \"9d5889b5a265849886a533965f4aefefd1fbd103\",",
        "    \"git_branch\" : \"refs/heads/master\",",
        "    \"git_tag\" : \"0.28.0\",",
        "    \"build_date\" : \"2016-02-15 10:00:28\",",
        "    \"build_time\" : 1455559228,",
        "    \"build_user\" : \"mesos-user\",",
        "    \"start_time\" : 1455643643.42422,",
        "    \"elected_time\" : 1455643643.43457,",
        "    \"id\" : \"b5eac2c5-609b-4ca1-a352-61941702fc9e\",",
        "    \"pid\" : \"master@127.0.0.1:5050\",",
        "    \"hostname\" : \"localhost\",",
        "    \"activated_slaves\" : 0,",
        "    \"deactivated_slaves\" : 0,",
        "    \"cluster\" : \"test-cluster\",",
        "    \"leader\" : \"master@127.0.0.1:5050\",",
        "    \"log_dir\" : \"/var/log\",",
        "    \"external_log_file\" : \"mesos.log\",",
        "    \"flags\" : {",
        "         \"framework_sorter\" : \"drf\",",
        "         \"authenticate\" : \"false\",",
        "         \"logbufsecs\" : \"0\",",
        "         \"initialize_driver_logging\" : \"true\",",
        "         \"work_dir\" : \"/var/lib/mesos\",",
        "         \"http_authenticators\" : \"basic\",",
        "         \"authorizers\" : \"local\",",
        "         \"agent_reregister_timeout\" : \"10mins\",",
        "         \"logging_level\" : \"INFO\",",
        "         \"help\" : \"false\",",
        "         \"root_submissions\" : \"true\",",
        "         \"ip\" : \"127.0.0.1\",",
        "         \"user_sorter\" : \"drf\",",
        "         \"version\" : \"false\",",
        "         \"max_agent_ping_timeouts\" : \"5\",",
        "         \"agent_ping_timeout\" : \"15secs\",",
        "         \"registry_store_timeout\" : \"20secs\",",
        "         \"max_completed_frameworks\" : \"50\",",
        "         \"quiet\" : \"false\",",
        "         \"allocator\" : \"HierarchicalDRF\",",
        "         \"hostname_lookup\" : \"true\",",
        "         \"authenticators\" : \"crammd5\",",
        "         \"max_completed_tasks_per_framework\" : \"1000\",",
        "         \"registry\" : \"replicated_log\",",
        "         \"registry_strict\" : \"false\",",
        "         \"log_auto_initialize\" : \"true\",",
        "         \"authenticate_agents\" : \"false\",",
        "         \"registry_fetch_timeout\" : \"1mins\",",
        "         \"allocation_interval\" : \"1secs\",",
        "         \"authenticate_http\" : \"false\",",
        "         \"port\" : \"5050\",",
        "         \"zk_session_timeout\" : \"10secs\",",
        "         \"recovery_agent_removal_limit\" : \"100%\",",
        "         \"webui_dir\" : \"/path/to/mesos/build/../src/webui\",",
        "         \"cluster\" : \"mycluster\",",
        "         \"leader\" : \"master@127.0.0.1:5050\",",
        "         \"log_dir\" : \"/var/log\",",
        "         \"external_log_file\" : \"mesos.log\"",
        "    },",
        "    \"slaves\" : [],",
        "    \"frameworks\" : [],",
        "    \"completed_frameworks\" : [],",
        "    \"orphan_tasks\" : [],",
        "    \"unregistered_frameworks\" : []",
        "}",
        "```"),
    AUTHENTICATION(true),
    AUTHORIZATION(
        "This endpoint might be filtered based on the user accessing it.",
        "For example a user might only see the subset of frameworks,",
        "tasks, and executors they are allowed to view.",
        "See the authorization documentation for details."));
}


Future<Response> Master::Http::state(
    const Request& request,
    const Option<Principal>& principal) const
{
  // TODO(greggomann): Remove this check once the `Principal` type is used in
  // `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
  // See MESOS-7202.
  if (principal.isSome() && principal->value.isNone()) {
    return Forbidden(
        "The request's authenticated principal contains claims, but no value "
        "string. The master currently requires that principals have a value");
  }

  // When current master is not the leader, redirect to the leading master.
  if (!master->elected()) {
    return redirect(request);
  }

  Future<Owned<AuthorizationAcceptor>> authorizeRole =
    AuthorizationAcceptor::create(
        principal, master->authorizer, authorization::VIEW_ROLE);
  Future<Owned<AuthorizationAcceptor>> authorizeFrameworkInfo =
    AuthorizationAcceptor::create(
        principal, master->authorizer, authorization::VIEW_FRAMEWORK);
  Future<Owned<AuthorizationAcceptor>> authorizeTask =
    AuthorizationAcceptor::create(
        principal, master->authorizer, authorization::VIEW_TASK);
  Future<Owned<AuthorizationAcceptor>> authorizeExecutorInfo =
    AuthorizationAcceptor::create(
        principal, master->authorizer, authorization::VIEW_EXECUTOR);
  Future<Owned<AuthorizationAcceptor>> authorizeFlags =
    AuthorizationAcceptor::create(
        principal, master->authorizer, authorization::VIEW_FLAGS);

  return collect(
      authorizeRole,
      authorizeFrameworkInfo,
      authorizeTask,
      authorizeExecutorInfo,
      authorizeFlags)
    .then(defer(
        master->self(),
        [this, request](const tuple<Owned<AuthorizationAcceptor>,
                                    Owned<AuthorizationAcceptor>,
                                    Owned<AuthorizationAcceptor>,
                                    Owned<AuthorizationAcceptor>,
                                    Owned<AuthorizationAcceptor>>& acceptors)
          -> Response {
      // This lambda is consumed before the outer lambda
      // returns, hence capture by reference is fine here.
      auto state = [this, &acceptors](JSON::ObjectWriter* writer) {
        Owned<AuthorizationAcceptor> authorizeRole;
        Owned<AuthorizationAcceptor> authorizeFrameworkInfo;
        Owned<AuthorizationAcceptor> authorizeTask;
        Owned<AuthorizationAcceptor> authorizeExecutorInfo;
        Owned<AuthorizationAcceptor> authorizeFlags;
        tie(authorizeRole,
            authorizeFrameworkInfo,
            authorizeTask,
            authorizeExecutorInfo,
            authorizeFlags) = acceptors;

        writer->field("version", MESOS_VERSION);

        if (build::GIT_SHA.isSome()) {
          writer->field("git_sha", build::GIT_SHA.get());
        }

        if (build::GIT_BRANCH.isSome()) {
          writer->field("git_branch", build::GIT_BRANCH.get());
        }

        if (build::GIT_TAG.isSome()) {
          writer->field("git_tag", build::GIT_TAG.get());
        }

        writer->field("build_date", build::DATE);
        writer->field("build_time", build::TIME);
        writer->field("build_user", build::USER);
        writer->field("start_time", master->startTime.secs());

        if (master->electedTime.isSome()) {
          writer->field("elected_time", master->electedTime.get().secs());
        }

        writer->field("id", master->info().id());
        writer->field("pid", string(master->self()));
        writer->field("hostname", master->info().hostname());
        writer->field("activated_slaves", master->_slaves_active());
        writer->field("deactivated_slaves", master->_slaves_inactive());
        writer->field("unreachable_slaves", master->_slaves_unreachable());

        if (master->info().has_domain()) {
          writer->field("domain", master->info().domain());
        }

        // TODO(haosdent): Deprecated this in favor of `leader_info` below.
        if (master->leader.isSome()) {
          writer->field("leader", master->leader->pid());
        }

        if (master->leader.isSome()) {
          writer->field("leader_info", [this](JSON::ObjectWriter* writer) {
            json(writer, master->leader.get());
          });
        }

        if (authorizeFlags->accept()) {
          if (master->flags.cluster.isSome()) {
            writer->field("cluster", master->flags.cluster.get());
          }

          if (master->flags.log_dir.isSome()) {
            writer->field("log_dir", master->flags.log_dir.get());
          }

          if (master->flags.external_log_file.isSome()) {
            writer->field("external_log_file",
                          master->flags.external_log_file.get());
          }

          writer->field("flags", [this](JSON::ObjectWriter* writer) {
              foreachvalue (const flags::Flag& flag, master->flags) {
                Option<string> value = flag.stringify(master->flags);
                if (value.isSome()) {
                  writer->field(flag.effective_name().value, value.get());
                }
              }
            });
        }

        // Model all of the registered slaves.
        writer->field("slaves",
          [this, &authorizeRole](JSON::ArrayWriter* writer) {
            foreachvalue (Slave* slave, master->slaves.registered) {
              writer->element(SlaveWriter(*slave, authorizeRole));
            }
          });

        // Model all of the recovered slaves.
        writer->field("recovered_slaves", [this](JSON::ArrayWriter* writer) {
          foreachvalue (const SlaveInfo& slaveInfo, master->slaves.recovered) {
            writer->element([&slaveInfo](JSON::ObjectWriter* writer) {
              json(writer, slaveInfo);
            });
          }
        });

        // Model all of the frameworks.
        writer->field(
            "frameworks",
            [this,
             &authorizeFrameworkInfo,
             &authorizeTask,
             &authorizeExecutorInfo](JSON::ArrayWriter* writer) {
          foreachvalue (
              Framework* framework,
              master->frameworks.registered) {
            // Skip unauthorized frameworks.
            if (!authorizeFrameworkInfo->accept(framework->info)) {
              continue;
            }

            auto frameworkWriter = FullFrameworkWriter(
                authorizeTask,
                authorizeExecutorInfo,
                framework);

            writer->element(frameworkWriter);
          }
        });

        // Model all of the completed frameworks.
        writer->field(
            "completed_frameworks",
            [this,
             &authorizeFrameworkInfo,
             &authorizeTask,
             &authorizeExecutorInfo](JSON::ArrayWriter* writer) {
          foreachvalue (const Owned<Framework>& framework,
                        master->frameworks.completed) {
            // Skip unauthorized frameworks.
            if (!authorizeFrameworkInfo->accept(framework->info)) {
              continue;
            }

            auto frameworkWriter = FullFrameworkWriter(
                authorizeTask,
                authorizeExecutorInfo,
                framework.get());

            writer->element(frameworkWriter);
          }
        });

        // Orphan tasks are no longer possible. We emit an empty array
        // for the sake of backward compatibility.
        writer->field("orphan_tasks", [](JSON::ArrayWriter*) {});

        // Unregistered frameworks are no longer possible. We emit an
        // empty array for the sake of backward compatibility.
        writer->field("unregistered_frameworks", [](JSON::ArrayWriter*) {});
      };

      return OK(jsonify(state), request.url.query.get("jsonp"));
    }));
}


Future<Response> Master::Http::readFile(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType contentType) const
{
  CHECK_EQ(mesos::master::Call::READ_FILE, call.type());

  const size_t offset = call.read_file().offset();
  const string& path = call.read_file().path();

  Option<size_t> length;
  if (call.read_file().has_length()) {
    length = call.read_file().length();
  }

  return master->files->read(offset, length, path, principal)
    .then([contentType](const Try<tuple<size_t, string>, FilesError>& result)
        -> Future<Response> {
      if (result.isError()) {
        const FilesError& error = result.error();

        switch (error.type) {
          case FilesError::Type::INVALID:
            return BadRequest(error.message);

          case FilesError::Type::UNAUTHORIZED:
            return Forbidden(error.message);

          case FilesError::Type::NOT_FOUND:
            return NotFound(error.message);

          case FilesError::Type::UNKNOWN:
            return InternalServerError(error.message);
        }

        UNREACHABLE();
      }

      mesos::master::Response response;
      response.set_type(mesos::master::Response::READ_FILE);

      response.mutable_read_file()->set_size(std::get<0>(result.get()));
      response.mutable_read_file()->set_data(std::get<1>(result.get()));

      return OK(serialize(contentType, evolve(response)),
                stringify(contentType));
    });
}


// This abstraction has no side-effects. It factors out computing the
// mapping from 'slaves' to 'frameworks' to answer the questions 'what
// frameworks are running on a given slave?' and 'what slaves are
// running the given framework?'.
class SlaveFrameworkMapping
{
public:
  SlaveFrameworkMapping(const hashmap<FrameworkID, Framework*>& frameworks)
  {
    foreachpair (const FrameworkID& frameworkId,
                 const Framework* framework,
                 frameworks) {
      foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
        frameworksToSlaves[frameworkId].insert(taskInfo.slave_id());
        slavesToFrameworks[taskInfo.slave_id()].insert(frameworkId);
      }

      foreachvalue (const Task* task, framework->tasks) {
        frameworksToSlaves[frameworkId].insert(task->slave_id());
        slavesToFrameworks[task->slave_id()].insert(frameworkId);
      }

      foreachvalue (const Owned<Task>& task, framework->unreachableTasks) {
        frameworksToSlaves[frameworkId].insert(task->slave_id());
        slavesToFrameworks[task->slave_id()].insert(frameworkId);
      }

      foreach (const Owned<Task>& task, framework->completedTasks) {
        frameworksToSlaves[frameworkId].insert(task->slave_id());
        slavesToFrameworks[task->slave_id()].insert(frameworkId);
      }
    }
  }

  const hashset<FrameworkID>& frameworks(const SlaveID& slaveId) const
  {
    const auto iterator = slavesToFrameworks.find(slaveId);
    return iterator != slavesToFrameworks.end() ?
      iterator->second : hashset<FrameworkID>::EMPTY;
  }

  const hashset<SlaveID>& slaves(const FrameworkID& frameworkId) const
  {
    const auto iterator = frameworksToSlaves.find(frameworkId);
    return iterator != frameworksToSlaves.end() ?
      iterator->second : hashset<SlaveID>::EMPTY;
  }

private:
  hashmap<SlaveID, hashset<FrameworkID>> slavesToFrameworks;
  hashmap<FrameworkID, hashset<SlaveID>> frameworksToSlaves;
};


// This abstraction has no side-effects. It factors out the accounting
// for a 'TaskState' summary. We use this to summarize 'TaskState's
// for both frameworks as well as slaves.
struct TaskStateSummary
{
  // TODO(jmlvanre): Possibly clean this up as per MESOS-2694.
  const static TaskStateSummary EMPTY;

  TaskStateSummary()
    : staging(0),
      starting(0),
      running(0),
      killing(0),
      finished(0),
      killed(0),
      failed(0),
      lost(0),
      error(0),
      dropped(0),
      unreachable(0),
      gone(0),
      gone_by_operator(0),
      unknown(0) {}

  // Account for the state of the given task.
  void count(const Task& task)
  {
    switch (task.state()) {
      case TASK_STAGING: { ++staging; break; }
      case TASK_STARTING: { ++starting; break; }
      case TASK_RUNNING: { ++running; break; }
      case TASK_KILLING: { ++killing; break; }
      case TASK_FINISHED: { ++finished; break; }
      case TASK_KILLED: { ++killed; break; }
      case TASK_FAILED: { ++failed; break; }
      case TASK_LOST: { ++lost; break; }
      case TASK_ERROR: { ++error; break; }
      case TASK_DROPPED: { ++dropped; break; }
      case TASK_UNREACHABLE: { ++unreachable; break; }
      case TASK_GONE: { ++gone; break; }
      case TASK_GONE_BY_OPERATOR: { ++gone_by_operator; break; }
      case TASK_UNKNOWN: { ++unknown; break; }
      // No default case allows for a helpful compiler error if we
      // introduce a new state.
    }
  }

  size_t staging;
  size_t starting;
  size_t running;
  size_t killing;
  size_t finished;
  size_t killed;
  size_t failed;
  size_t lost;
  size_t error;
  size_t dropped;
  size_t unreachable;
  size_t gone;
  size_t gone_by_operator;
  size_t unknown;
};


const TaskStateSummary TaskStateSummary::EMPTY;


// This abstraction has no side-effects. It factors out computing the
// 'TaskState' summaries for frameworks and slaves. This answers the
// questions 'How many tasks are in each state for a given framework?'
// and 'How many tasks are in each state for a given slave?'.
class TaskStateSummaries
{
public:
  TaskStateSummaries(const hashmap<FrameworkID, Framework*>& frameworks)
  {
    foreachpair (const FrameworkID& frameworkId,
                 const Framework* framework,
                 frameworks) {
      foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
        frameworkTaskSummaries[frameworkId].staging++;
        slaveTaskSummaries[taskInfo.slave_id()].staging++;
      }

      foreachvalue (const Task* task, framework->tasks) {
        frameworkTaskSummaries[frameworkId].count(*task);
        slaveTaskSummaries[task->slave_id()].count(*task);
      }

      foreachvalue (const Owned<Task>& task, framework->unreachableTasks) {
        frameworkTaskSummaries[frameworkId].count(*task.get());
        slaveTaskSummaries[task->slave_id()].count(*task.get());
      }

      foreach (const Owned<Task>& task, framework->completedTasks) {
        frameworkTaskSummaries[frameworkId].count(*task.get());
        slaveTaskSummaries[task->slave_id()].count(*task.get());
      }
    }
  }

  const TaskStateSummary& framework(const FrameworkID& frameworkId) const
  {
    const auto iterator = frameworkTaskSummaries.find(frameworkId);
    return iterator != frameworkTaskSummaries.end() ?
      iterator->second : TaskStateSummary::EMPTY;
  }

  const TaskStateSummary& slave(const SlaveID& slaveId) const
  {
    const auto iterator = slaveTaskSummaries.find(slaveId);
    return iterator != slaveTaskSummaries.end() ?
      iterator->second : TaskStateSummary::EMPTY;
  }

private:
  hashmap<FrameworkID, TaskStateSummary> frameworkTaskSummaries;
  hashmap<SlaveID, TaskStateSummary> slaveTaskSummaries;
};


string Master::Http::STATESUMMARY_HELP()
{
  return HELP(
    TLDR(
        "Summary of agents, tasks, and registered frameworks in cluster."),
    DESCRIPTION(
        "Returns 200 OK when a summary of the master's state was queried",
        "successfully.",
        "",
        "Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
        "current master is not the leader.",
        "",
        "Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
        "found.",
        "",
        "This endpoint gives a summary of the agents, tasks, and",
        "registered frameworks in the cluster as a JSON object.",
        "The information shown might be filtered based on the user",
        "accessing the endpoint."),
    AUTHENTICATION(true),
    AUTHORIZATION(
        "This endpoint might be filtered based on the user accessing it.",
        "For example a user might only see the subset of frameworks",
        "they are allowed to view.",
        "See the authorization documentation for details."));
}


Future<Response> Master::Http::stateSummary(
    const Request& request,
    const Option<Principal>& principal) const
{
  // TODO(greggomann): Remove this check once the `Principal` type is used in
  // `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
  // See MESOS-7202.
  if (principal.isSome() && principal->value.isNone()) {
    return Forbidden(
        "The request's authenticated principal contains claims, but no value "
        "string. The master currently requires that principals have a value");
  }

  // When current master is not the leader, redirect to the leading master.
  if (!master->elected()) {
    return redirect(request);
  }

  Future<Owned<AuthorizationAcceptor>> authorizeRole =
    AuthorizationAcceptor::create(
        principal, master->authorizer, authorization::VIEW_ROLE);
  Future<Owned<AuthorizationAcceptor>> authorizeFrameworkInfo =
    AuthorizationAcceptor::create(
        principal, master->authorizer, authorization::VIEW_FRAMEWORK);

  return collect(authorizeRole, authorizeFrameworkInfo).then(defer(
      master->self(),
      [this, request](const tuple<Owned<AuthorizationAcceptor>,
                                  Owned<AuthorizationAcceptor>>& acceptors)
          -> Response {
        auto stateSummary = [this, &acceptors](JSON::ObjectWriter* writer) {
          Owned<AuthorizationAcceptor> authorizeRole;
          Owned<AuthorizationAcceptor> authorizeFrameworkInfo;
          tie(authorizeRole, authorizeFrameworkInfo) = acceptors;

          writer->field("hostname", master->info().hostname());

          if (master->flags.cluster.isSome()) {
            writer->field("cluster", master->flags.cluster.get());
          }

          // We use the tasks in the 'Frameworks' struct to compute summaries
          // for this endpoint. This is done 1) for consistency between the
          // 'slaves' and 'frameworks' subsections below 2) because we want to
          // provide summary information for frameworks that are currently
          // registered 3) the frameworks keep a circular buffer of completed
          // tasks that we can use to keep a limited view on the history of
          // recent completed / failed tasks.

          // Generate mappings from 'slave' to 'framework' and reverse.
          SlaveFrameworkMapping slaveFrameworkMapping(
              master->frameworks.registered);

          // Generate 'TaskState' summaries for all framework and slave ids.
          TaskStateSummaries taskStateSummaries(master->frameworks.registered);

          // Model all of the slaves.
          writer->field(
              "slaves",
              [this,
               &slaveFrameworkMapping,
               &taskStateSummaries,
               &authorizeRole](JSON::ArrayWriter* writer) {
                foreachvalue (Slave* slave, master->slaves.registered) {
                  writer->element(
                      [&slave,
                       &slaveFrameworkMapping,
                       &taskStateSummaries,
                       &authorizeRole](JSON::ObjectWriter* writer) {
                        SlaveWriter slaveWriter(*slave, authorizeRole);
                        slaveWriter(writer);

                        // Add the 'TaskState' summary for this slave.
                        const TaskStateSummary& summary =
                            taskStateSummaries.slave(slave->id);

                        // Certain per-agent status totals will always be zero
                        // (e.g., TASK_ERROR, TASK_UNREACHABLE). We report them
                        // here anyway, for completeness.
                        //
                        // TODO(neilc): Update for TASK_GONE and
                        // TASK_GONE_BY_OPERATOR.
                        writer->field("TASK_STAGING", summary.staging);
                        writer->field("TASK_STARTING", summary.starting);
                        writer->field("TASK_RUNNING", summary.running);
                        writer->field("TASK_KILLING", summary.killing);
                        writer->field("TASK_FINISHED", summary.finished);
                        writer->field("TASK_KILLED", summary.killed);
                        writer->field("TASK_FAILED", summary.failed);
                        writer->field("TASK_LOST", summary.lost);
                        writer->field("TASK_ERROR", summary.error);
                        writer->field("TASK_UNREACHABLE", summary.unreachable);

                        // Add the ids of all the frameworks running on this
                        // slave.
                        const hashset<FrameworkID>& frameworks =
                            slaveFrameworkMapping.frameworks(slave->id);

                        writer->field(
                            "framework_ids",
                            [&frameworks](JSON::ArrayWriter* writer) {
                              foreach (
                                  const FrameworkID& frameworkId,
                                  frameworks) {
                                writer->element(frameworkId.value());
                              }
                            });
                      });
                }
              });

          // Model all of the frameworks.
          writer->field(
              "frameworks",
              [this,
               &slaveFrameworkMapping,
               &taskStateSummaries,
               &authorizeFrameworkInfo](JSON::ArrayWriter* writer) {
                foreachpair (const FrameworkID& frameworkId,
                             Framework* framework,
                             master->frameworks.registered) {
                  // Skip unauthorized frameworks.
                  if (!authorizeFrameworkInfo->accept(framework->info)) {
                    continue;
                  }

                  writer->element(
                      [&frameworkId,
                       &framework,
                       &slaveFrameworkMapping,
                       &taskStateSummaries](JSON::ObjectWriter* writer) {
                        json(writer, Summary<Framework>(*framework));

                        // Add the 'TaskState' summary for this framework.
                        const TaskStateSummary& summary =
                            taskStateSummaries.framework(frameworkId);

                        // TODO(neilc): Update for TASK_GONE and
                        // TASK_GONE_BY_OPERATOR.
                        writer->field("TASK_STAGING", summary.staging);
                        writer->field("TASK_STARTING", summary.starting);
                        writer->field("TASK_RUNNING", summary.running);
                        writer->field("TASK_KILLING", summary.killing);
                        writer->field("TASK_FINISHED", summary.finished);
                        writer->field("TASK_KILLED", summary.killed);
                        writer->field("TASK_FAILED", summary.failed);
                        writer->field("TASK_LOST", summary.lost);
                        writer->field("TASK_ERROR", summary.error);
                        writer->field("TASK_UNREACHABLE", summary.unreachable);

                        // Add the ids of all the slaves running this framework.
                        const hashset<SlaveID>& slaves =
                            slaveFrameworkMapping.slaves(frameworkId);

                        writer->field(
                            "slave_ids",
                            [&slaves](JSON::ArrayWriter* writer) {
                              foreach (const SlaveID& slaveId, slaves) {
                                writer->element(slaveId.value());
                              }
                            });
                      });
                }
              });
        };

        return OK(jsonify(stateSummary), request.url.query.get("jsonp"));
      }));
}


// Returns a JSON object modeled after a role.
JSON::Object model(
    const string& name,
    Option<double> weight,
    Option<Quota> quota,
    Option<Role*> _role)
{
  JSON::Object object;
  object.values["name"] = name;

  if (weight.isSome()) {
    object.values["weight"] = weight.get();
  } else {
    object.values["weight"] = 1.0; // Default weight.
  }

  if (quota.isSome()) {
    object.values["quota"] = model(quota->info);
  }

  if (_role.isNone()) {
    object.values["resources"] = model(Resources());
    object.values["frameworks"] = JSON::Array();
  } else {
    Role* role = _role.get();

    object.values["resources"] = model(role->allocatedResources());

    {
      JSON::Array array;

      foreachkey (const FrameworkID& frameworkId, role->frameworks) {
        array.values.push_back(frameworkId.value());
      }

      object.values["frameworks"] = std::move(array);
    }
  }

  return object;
}


string Master::Http::ROLES_HELP()
{
  return HELP(
    TLDR(
        "Information about roles."),
    DESCRIPTION(
        "Returns 200 OK when information about roles was queried successfully.",
        "",
        "Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
        "current master is not the leader.",
        "",
        "Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
        "found.",
        "",
        "This endpoint provides information about roles as a JSON object.",
        "It returns information about every role that is on the role",
        "whitelist (if enabled), has one or more registered frameworks,",
        "or has a non-default weight or quota. For each role, it returns",
        "the weight, total allocated resources, and registered frameworks."),
    AUTHENTICATION(true));
}


Future<vector<string>> Master::Http::_roles(
    const Option<Principal>& principal) const
{
  // Retrieve `ObjectApprover`s for authorizing roles.
  Future<Owned<ObjectApprover>> rolesApprover;

  if (master->authorizer.isSome()) {
    Option<authorization::Subject> subject = createSubject(principal);

    rolesApprover = master->authorizer.get()->getObjectApprover(
        subject, authorization::VIEW_ROLE);
  } else {
    rolesApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
  }

  return rolesApprover
    .then(defer(master->self(),
        [this](const Owned<ObjectApprover>& rolesApprover)
          -> vector<string> {
      JSON::Object object;

      // Compute the role names to return results for. When an explicit
      // role whitelist has been configured, we use that list of names.
      // When using implicit roles, the right behavior is a bit more
      // subtle. There are no constraints on possible role names, so we
      // instead list all the "interesting" roles: all roles with one or
      // more registered frameworks, and all roles with a non-default
      // weight or quota.
      //
      // NOTE: we use a `std::set` to store the role names to ensure a
      // deterministic output order.
      set<string> roleList;
      if (master->roleWhitelist.isSome()) {
        const hashset<string>& whitelist = master->roleWhitelist.get();
        roleList.insert(whitelist.begin(), whitelist.end());
      } else {
        hashset<string> roles = master->roles.keys();
        roleList.insert(roles.begin(), roles.end());

        hashset<string> weights = master->weights.keys();
        roleList.insert(weights.begin(), weights.end());

        hashset<string> quotas = master->quotas.keys();
        roleList.insert(quotas.begin(), quotas.end());
      }

      vector<string> filteredRoleList;
      filteredRoleList.reserve(roleList.size());

      foreach (const string& role, roleList) {
        if (approveViewRole(rolesApprover, role)) {
          filteredRoleList.push_back(role);
        }
      }

      return filteredRoleList;
    }));
}


Future<Response> Master::Http::roles(
    const Request& request,
    const Option<Principal>& principal) const
{
  // TODO(greggomann): Remove this check once the `Principal` type is used in
  // `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
  // See MESOS-7202.
  if (principal.isSome() && principal->value.isNone()) {
    return Forbidden(
        "The request's authenticated principal contains claims, but no value "
        "string. The master currently requires that principals have a value");
  }

  // When current master is not the leader, redirect to the leading master.
  if (!master->elected()) {
    return redirect(request);
  }

  return _roles(principal)
    .then(defer(master->self(),
        [this, request](const vector<string>& filteredRoles)
          -> Response {
      JSON::Object object;

      {
        JSON::Array array;

        foreach (const string& name, filteredRoles) {
          Option<double> weight = None();
          if (master->weights.contains(name)) {
            weight = master->weights[name];
          }

          Option<Quota> quota = None();
          if (master->quotas.contains(name)) {
            quota = master->quotas.at(name);
          }

          Option<Role*> role = None();
          if (master->roles.contains(name)) {
            role = master->roles.at(name);
          }

          array.values.push_back(model(name, weight, quota, role));
        }

        object.values["roles"] = std::move(array);
      }

      return OK(object, request.url.query.get("jsonp"));
    }));
}


Future<Response> Master::Http::listFiles(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType contentType) const
{
  CHECK_EQ(mesos::master::Call::LIST_FILES, call.type());

  const string& path = call.list_files().path();

  return master->files->browse(path, principal)
    .then([contentType](const Try<list<FileInfo>, FilesError>& result)
      -> Future<Response> {
      if (result.isError()) {
        const FilesError& error = result.error();

        switch (error.type) {
          case FilesError::Type::INVALID:
            return BadRequest(error.message);

          case FilesError::Type::UNAUTHORIZED:
            return Forbidden(error.message);

          case FilesError::Type::NOT_FOUND:
            return NotFound(error.message);

          case FilesError::Type::UNKNOWN:
            return InternalServerError(error.message);
        }

        UNREACHABLE();
      }

      mesos::master::Response response;
      response.set_type(mesos::master::Response::LIST_FILES);

      mesos::master::Response::ListFiles* listFiles =
        response.mutable_list_files();

      foreach (const FileInfo& fileInfo, result.get()) {
        listFiles->add_file_infos()->CopyFrom(fileInfo);
      }

      return OK(serialize(contentType, evolve(response)),
                stringify(contentType));
    });
}


// This duplicates the functionality offered by `roles()`. This was necessary
// as the JSON object returned by `roles()` was not specified in a formal way
// i.e. via a corresponding protobuf object and would have been very hard to
// convert back into a `Resource` object.
Future<Response> Master::Http::getRoles(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType contentType) const
{
  CHECK_EQ(mesos::master::Call::GET_ROLES, call.type());

  return _roles(principal)
    .then(defer(master->self(),
        [this, contentType](const vector<string>& filteredRoles)
          -> Response {
      mesos::master::Response response;
      response.set_type(mesos::master::Response::GET_ROLES);

      mesos::master::Response::GetRoles* getRoles =
        response.mutable_get_roles();

      foreach (const string& name, filteredRoles) {
        mesos::Role role;

        if (master->weights.contains(name)) {
          role.set_weight(master->weights[name]);
        } else {
          role.set_weight(1.0);
        }

        if (master->roles.contains(name)) {
          Role* role_ = master->roles.at(name);

          role.mutable_resources()->CopyFrom(role_->allocatedResources());

          foreachkey (const FrameworkID& frameworkId, role_->frameworks) {
            role.add_frameworks()->CopyFrom(frameworkId);
          }
        }

        role.set_name(name);

        getRoles->add_roles()->CopyFrom(role);
      }

      return OK(serialize(contentType, evolve(response)),
                stringify(contentType));
    }));
}


string Master::Http::TEARDOWN_HELP()
{
  return HELP(
    TLDR(
        "Tears down a running framework by shutting down all tasks/executors "
        "and removing the framework."),
    DESCRIPTION(
        "Returns 200 OK if the framework was torn down successfully.",
        "",
        "Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
        "current master is not the leader.",
        "",
        "Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
        "found.",
        "",
        "Please provide a \"frameworkId\" value designating the running",
        "framework to tear down."),
    AUTHENTICATION(true),
    AUTHORIZATION(
        "Using this endpoint to teardown frameworks requires that the",
        "current principal is authorized to teardown frameworks created",
        "by the principal who created the framework.",
        "See the authorization documentation for details."));
}


Future<Response> Master::Http::teardown(
    const Request& request,
    const Option<Principal>& principal) const
{
  // TODO(greggomann): Remove this check once the `Principal` type is used in
  // `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
  // See MESOS-7202.
  if (principal.isSome() && principal->value.isNone()) {
    return Forbidden(
        "The request's authenticated principal contains claims, but no value "
        "string. The master currently requires that principals have a value");
  }

  // When current master is not the leader, redirect to the leading master.
  if (!master->elected()) {
    return redirect(request);
  }

  if (request.method != "POST") {
    return MethodNotAllowed({"POST"}, request.method);
  }

  // Parse the query string in the request body (since this is a POST)
  // in order to determine the framework ID to shutdown.
  Try<hashmap<string, string>> decode =
    process::http::query::decode(request.body);

  if (decode.isError()) {
    return BadRequest("Unable to decode query string: " + decode.error());
  }

  const hashmap<string, string>& values = decode.get();

  Option<string> value = values.get("frameworkId");
  if (value.isNone()) {
    return BadRequest(
        "Missing 'frameworkId' query parameter in the request body");
  }

  FrameworkID id;
  id.set_value(value.get());

  Framework* framework = master->getFramework(id);

  if (framework == nullptr) {
    return BadRequest("No framework found with specified ID");
  }

  // Skip authorization if no ACLs were provided to the master.
  if (master->authorizer.isNone()) {
    return _teardown(id);
  }

  authorization::Request teardown;
  teardown.set_action(authorization::TEARDOWN_FRAMEWORK);

  Option<authorization::Subject> subject = createSubject(principal);
  if (subject.isSome()) {
    teardown.mutable_subject()->CopyFrom(subject.get());
  }

  if (framework->info.has_principal()) {
    teardown.mutable_object()->mutable_framework_info()->CopyFrom(
        framework->info);
    teardown.mutable_object()->set_value(framework->info.principal());
  }

  return master->authorizer.get()->authorized(teardown)
    .then(defer(master->self(), [=](bool authorized) -> Future<Response> {
      if (!authorized) {
        return Forbidden();
      }
      return _teardown(id);
    }));
}


Future<Response> Master::Http::_teardown(const FrameworkID& id) const
{
  Framework* framework = master->getFramework(id);

  if (framework == nullptr) {
    return BadRequest("No framework found with ID " + stringify(id));
  }

  // TODO(ijimenez): Do 'removeFramework' asynchronously.
  master->removeFramework(framework);

  return OK();
}


struct TaskComparator
{
  static bool ascending(const Task* lhs, const Task* rhs)
  {
    size_t lhsSize = lhs->statuses().size();
    size_t rhsSize = rhs->statuses().size();

    if ((lhsSize == 0) && (rhsSize == 0)) {
      return false;
    }

    if (lhsSize == 0) {
      return true;
    }

    if (rhsSize == 0) {
      return false;
    }

    return (lhs->statuses(0).timestamp() < rhs->statuses(0).timestamp());
  }

  static bool descending(const Task* lhs, const Task* rhs)
  {
    size_t lhsSize = lhs->statuses().size();
    size_t rhsSize = rhs->statuses().size();

    if ((lhsSize == 0) && (rhsSize == 0)) {
      return false;
    }

    if (rhsSize == 0) {
      return true;
    }

    if (lhsSize == 0) {
      return false;
    }

    return (lhs->statuses(0).timestamp() > rhs->statuses(0).timestamp());
  }
};


string Master::Http::TASKS_HELP()
{
  return HELP(
    TLDR(
        "Lists tasks from all active frameworks."),
    DESCRIPTION(
        "Returns 200 OK when task information was queried successfully.",
        "",
        "Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
        "current master is not the leader.",
        "",
        "Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
        "found.",
        "",
        "Lists known tasks.",
        "The information shown might be filtered based on the user",
        "accessing the endpoint.",
        "",
        "Query parameters:",
        "",
        ">        framework_id=VALUE   Only return tasks belonging to the "
        "framework with this ID.",
        ">        limit=VALUE          Maximum number of tasks returned "
        "(default is " + stringify(TASK_LIMIT) + ").",
        ">        offset=VALUE         Starts task list at offset.",
        ">        order=(asc|desc)     Ascending or descending sort order "
        "(default is descending).",
        ">        task_id=VALUE        Only return tasks with this ID "
        "(should be used together with parameter 'framework_id')."
        ""),
    AUTHENTICATION(true),
    AUTHORIZATION(
        "This endpoint might be filtered based on the user accessing it.",
        "For example a user might only see the subset of tasks they are",
        "allowed to view.",
        "See the authorization documentation for details."));
}


Future<Response> Master::Http::tasks(
    const Request& request,
    const Option<Principal>& principal) const
{
  // TODO(greggomann): Remove this check once the `Principal` type is used in
  // `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
  // See MESOS-7202.
  if (principal.isSome() && principal->value.isNone()) {
    return Forbidden(
        "The request's authenticated principal contains claims, but no value "
        "string. The master currently requires that principals have a value");
  }

  // When current master is not the leader, redirect to the leading master.
  if (!master->elected()) {
    return redirect(request);
  }

  // Get list options (limit and offset).
  Result<int> result = numify<int>(request.url.query.get("limit"));
  size_t limit = result.isSome() ? result.get() : TASK_LIMIT;

  result = numify<int>(request.url.query.get("offset"));
  size_t offset = result.isSome() ? result.get() : 0;

  Option<string> order = request.url.query.get("order");
  string _order = order.isSome() && (order.get() == "asc") ? "asc" : "des";

  Future<Owned<AuthorizationAcceptor>> authorizeFrameworkInfo =
    AuthorizationAcceptor::create(
        principal,
        master->authorizer,
        authorization::VIEW_FRAMEWORK);
  Future<Owned<AuthorizationAcceptor>> authorizeTask =
    AuthorizationAcceptor::create(
        principal,
        master->authorizer,
        authorization::VIEW_TASK);
  Future<IDAcceptor<FrameworkID>> selectFrameworkId =
    IDAcceptor<FrameworkID>(request.url.query.get("framework_id"));
  Future<IDAcceptor<TaskID>> selectTaskId =
    IDAcceptor<TaskID>(request.url.query.get("task_id"));

  return collect(
      authorizeFrameworkInfo,
      authorizeTask,
      selectFrameworkId,
      selectTaskId)
    .then(defer(
        master->self(),
        [=](const tuple<Owned<AuthorizationAcceptor>,
                        Owned<AuthorizationAcceptor>,
                        IDAcceptor<FrameworkID>,
                        IDAcceptor<TaskID>>& acceptors)-> Future<Response> {
          Owned<AuthorizationAcceptor> authorizeFrameworkInfo;
          Owned<AuthorizationAcceptor> authorizeTask;
          IDAcceptor<FrameworkID> selectFrameworkId;
          IDAcceptor<TaskID> selectTaskId;
          tie(authorizeFrameworkInfo,
              authorizeTask,
              selectFrameworkId,
              selectTaskId) = acceptors;

          // Construct framework list with both active and completed frameworks.
          vector<const Framework*> frameworks;
          foreachvalue (Framework* framework, master->frameworks.registered) {
            // Skip unauthorized frameworks or frameworks without matching
            // framework ID.
            if (!selectFrameworkId.accept(framework->id()) ||
                !authorizeFrameworkInfo->accept(framework->info)) {
              continue;
            }

            frameworks.push_back(framework);
          }

          foreachvalue (const Owned<Framework>& framework,
                        master->frameworks.completed) {
            // Skip unauthorized frameworks or frameworks without matching
            // framework ID.
            if (!selectFrameworkId.accept(framework->id()) ||
                !authorizeFrameworkInfo->accept(framework->info)) {
             continue;
            }

            frameworks.push_back(framework.get());
          }

          // Construct task list with both running,
          // completed and unreachable tasks.
          vector<const Task*> tasks;
          foreach (const Framework* framework, frameworks) {
            foreachvalue (Task* task, framework->tasks) {
              CHECK_NOTNULL(task);
              // Skip unauthorized tasks or tasks without matching task ID.
              if (!selectTaskId.accept(task->task_id()) ||
                  !authorizeTask->accept(*task, framework->info)) {
                continue;
              }

              tasks.push_back(task);
            }

            foreachvalue (
                const Owned<Task>& task,
                framework->unreachableTasks) {
              // Skip unauthorized tasks or tasks without matching task ID.
              if (!selectTaskId.accept(task.get()->task_id()) ||
                  !authorizeTask->accept(*task.get(), framework->info)) {
                continue;
              }

              tasks.push_back(task.get());
            }

            foreach (const Owned<Task>& task, framework->completedTasks) {
              // Skip unauthorized tasks or tasks without matching task ID.
              if (!selectTaskId.accept(task.get()->task_id()) ||
                  !authorizeTask->accept(*task.get(), framework->info)) {
                continue;
              }

              tasks.push_back(task.get());
            }
          }

          // Sort tasks by task status timestamp. Default order is descending.
          // The earliest timestamp is chosen for comparison when
          // multiple are present.
          if (_order == "asc") {
            sort(tasks.begin(), tasks.end(), TaskComparator::ascending);
          } else {
            sort(tasks.begin(), tasks.end(), TaskComparator::descending);
          }

          auto tasksWriter =
            [&tasks, limit, offset](JSON::ObjectWriter* writer) {
            writer->field("tasks",
                          [&tasks, limit, offset](JSON::ArrayWriter* writer) {
              // Collect 'limit' number of tasks starting from 'offset'.
              size_t end = std::min(offset + limit, tasks.size());
              for (size_t i = offset; i < end; i++) {
                writer->element(*tasks[i]);
              }
            });
          };

          return OK(jsonify(tasksWriter), request.url.query.get("jsonp"));
  }));
}


Future<Response> Master::Http::getTasks(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType contentType) const
{
  CHECK_EQ(mesos::master::Call::GET_TASKS, call.type());

  // Retrieve Approvers for authorizing frameworks and tasks.
  Future<Owned<ObjectApprover>> frameworksApprover;
  Future<Owned<ObjectApprover>> tasksApprover;
  if (master->authorizer.isSome()) {
    Option<authorization::Subject> subject = createSubject(principal);

    frameworksApprover = master->authorizer.get()->getObjectApprover(
        subject, authorization::VIEW_FRAMEWORK);

    tasksApprover = master->authorizer.get()->getObjectApprover(
        subject, authorization::VIEW_TASK);
  } else {
    frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
    tasksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
  }

  return collect(frameworksApprover, tasksApprover)
    .then(defer(
        master->self(),
        [=](const tuple<Owned<ObjectApprover>,
                        Owned<ObjectApprover>>& approvers)
        -> Future<Response> {
      // Get approver from tuple.
      Owned<ObjectApprover> frameworksApprover;
      Owned<ObjectApprover> tasksApprover;
      tie(frameworksApprover, tasksApprover) = approvers;

      mesos::master::Response response;
      response.set_type(mesos::master::Response::GET_TASKS);

      response.mutable_get_tasks()->CopyFrom(
          _getTasks(frameworksApprover,
                    tasksApprover));

      return OK(serialize(contentType, evolve(response)),
                stringify(contentType));
  }));
}


mesos::master::Response::GetTasks Master::Http::_getTasks(
    const Owned<ObjectApprover>& frameworksApprover,
    const Owned<ObjectApprover>& tasksApprover) const
{
  // Construct framework list with both active and completed frameworks.
  vector<const Framework*> frameworks;
  foreachvalue (Framework* framework, master->frameworks.registered) {
    // Skip unauthorized frameworks.
    if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
      continue;
    }

    frameworks.push_back(framework);
  }

  foreachvalue (const Owned<Framework>& framework,
                master->frameworks.completed) {
    // Skip unauthorized frameworks.
    if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
      continue;
    }

    frameworks.push_back(framework.get());
  }

  mesos::master::Response::GetTasks getTasks;

  vector<const Task*> tasks;
  foreach (const Framework* framework, frameworks) {
    // Pending tasks.
    foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
      // Skip unauthorized tasks.
      if (!approveViewTaskInfo(tasksApprover, taskInfo, framework->info)) {
        continue;
      }

      const Task& task =
        protobuf::createTask(taskInfo, TASK_STAGING, framework->id());

      getTasks.add_pending_tasks()->CopyFrom(task);
    }

    // Active tasks.
    foreachvalue (Task* task, framework->tasks) {
      CHECK_NOTNULL(task);
      // Skip unauthorized tasks.
      if (!approveViewTask(tasksApprover, *task, framework->info)) {
        continue;
      }

      getTasks.add_tasks()->CopyFrom(*task);
    }

    // Unreachable tasks.
    foreachvalue (const Owned<Task>& task, framework->unreachableTasks) {
      // Skip unauthorized tasks.
      if (!approveViewTask(tasksApprover, *task.get(), framework->info)) {
        continue;
      }

      getTasks.add_unreachable_tasks()->CopyFrom(*task);
    }

    // Completed tasks.
    foreach (const Owned<Task>& task, framework->completedTasks) {
      // Skip unauthorized tasks.
      if (!approveViewTask(tasksApprover, *task.get(), framework->info)) {
        continue;
      }

      getTasks.add_completed_tasks()->CopyFrom(*task);
    }
  }

  return getTasks;
}


// /master/maintenance/schedule endpoint help.
string Master::Http::MAINTENANCE_SCHEDULE_HELP()
{
  return HELP(
    TLDR(
        "Returns or updates the cluster's maintenance schedule."),
    DESCRIPTION(
        "Returns 200 OK when the requested maintenance operation was performed",
        "successfully.",
        "",
        "Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
        "current master is not the leader.",
        "",
        "Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
        "found.",
        "",
        "GET: Returns the current maintenance schedule as JSON.",
        "",
        "POST: Validates the request body as JSON",
        "and updates the maintenance schedule."),
    AUTHENTICATION(true),
    AUTHORIZATION(
        "GET: The response will contain only the maintenance schedule for",
        "those machines the current principal is allowed to see. If none",
        "an empty response will be returned.",
        "",
        "POST: The current principal must be authorized to modify the",
        "maintenance schedule of all the machines in the request. If the",
        "principal is unauthorized to modify the schedule for at least one",
        "machine, the whole request will fail."));
}


// /master/maintenance/schedule endpoint handler.
Future<Response> Master::Http::maintenanceSchedule(
    const Request& request,
    const Option<Principal>& principal) const
{
  // When current master is not the leader, redirect to the leading master.
  if (!master->elected()) {
    return redirect(request);
  }

  if (request.method != "GET" && request.method != "POST") {
    return MethodNotAllowed({"GET", "POST"}, request.method);
  }

  // JSON-ify and return the current maintenance schedule.
  if (request.method == "GET") {
    Future<Owned<ObjectApprover>> approver;

    if (master->authorizer.isSome()) {
      Option<authorization::Subject> subject = createSubject(principal);

      approver = master->authorizer.get()->getObjectApprover(
          subject, authorization::GET_MAINTENANCE_SCHEDULE);
    } else {
      approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
    }

    Option<string> jsonp = request.url.query.get("jsonp");

    return approver.then(defer(
        master->self(),
        [this, jsonp](
            const Owned<ObjectApprover>& approver) -> Future<Response> {
          const mesos::maintenance::Schedule schedule =
            _getMaintenanceSchedule(approver);
          return OK(JSON::protobuf(schedule), jsonp);
        }));
  }

  // Parse the POST body as JSON.
  Try<JSON::Object> jsonSchedule = JSON::parse<JSON::Object>(request.body);
  if (jsonSchedule.isError()) {
    return BadRequest(jsonSchedule.error());
  }

  // Convert the schedule to a protobuf.
  Try<mesos::maintenance::Schedule> protoSchedule =
    ::protobuf::parse<mesos::maintenance::Schedule>(jsonSchedule.get());

  if (protoSchedule.isError()) {
    return BadRequest(protoSchedule.error());
  }

  return _updateMaintenanceSchedule(protoSchedule.get(), principal);
}


mesos::maintenance::Schedule Master::Http::_getMaintenanceSchedule(
    const Owned<ObjectApprover>& approver) const
{
  // TODO(josephw): Return more than one schedule.
  if (master->maintenance.schedules.empty()) {
    return mesos::maintenance::Schedule();
  }

  mesos::maintenance::Schedule schedule;

  foreach (const mesos::maintenance::Window& window,
           master->maintenance.schedules.front().windows()) {
    mesos::maintenance::Window window_;

    foreach (const MachineID& machine_id, window.machine_ids()) {
      Try<bool> approved =
        approver->approved(ObjectApprover::Object(machine_id));

      if (approved.isError()) {
        LOG(WARNING) << "Error during MachineID authorization: "
                     << approved.error();
        // TODO(arojas): Consider exposing these errors to the caller.
        continue;
      }

      if (!approved.get()) {
        continue;
      }

      window_.add_machine_ids()->CopyFrom(machine_id);
    }

    if (window_.machine_ids_size() > 0) {
      window_.mutable_unavailability()->CopyFrom(window.unavailability());
      schedule.add_windows()->CopyFrom(window_);
    }
  }

  return schedule;
}


Future<Response> Master::Http::_updateMaintenanceSchedule(
    const mesos::maintenance::Schedule& schedule,
    const Option<process::http::authentication::Principal>& principal) const
{
  // Validate that the schedule only transitions machines between
  // `UP` and `DRAINING` modes.
  Try<Nothing> isValid = maintenance::validation::schedule(
      schedule,
      master->machines);

  if (isValid.isError()) {
    return BadRequest(isValid.error());
  }

  Future<Owned<ObjectApprover>> approver;

  if (master->authorizer.isSome()) {
    Option<authorization::Subject> subject = createSubject(principal);

    approver = master->authorizer.get()->getObjectApprover(
        subject, authorization::UPDATE_MAINTENANCE_SCHEDULE);
  } else {
    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
  }

  return approver.then(defer(
      master->self(),
      [this, schedule](const Owned<ObjectApprover>& approver) {
        return __updateMaintenanceSchedule(schedule, approver);
      }));
}

Future<Response> Master::Http::__updateMaintenanceSchedule(
    const mesos::maintenance::Schedule& schedule,
    const Owned<ObjectApprover>& approver) const
{
  foreach (const mesos::maintenance::Window& window, schedule.windows()) {
    foreach (const MachineID& machine, window.machine_ids()) {
      Try<bool> approved = approver->approved(ObjectApprover::Object(machine));

      if (approved.isError()) {
        return InternalServerError("Authorization error: " + approved.error());
      } else if (!approved.get()) {
        return Forbidden();
      }
    }
  }

  return master->registrar
    ->apply(Owned<Operation>(new maintenance::UpdateSchedule(schedule)))
    .then(defer(master->self(), [this, schedule](bool result) {
      return ___updateMaintenanceSchedule(schedule, result);
    }));
}

Future<Response> Master::Http::___updateMaintenanceSchedule(
    const mesos::maintenance::Schedule& schedule,
    bool applied) const
{
  // See the top comment in "master/maintenance.hpp" for why this check
  // is here, and is appropriate.
  CHECK(applied);

  // Update the master's local state with the new schedule.
  // NOTE: We only add or remove differences between the current schedule
  // and the new schedule.  This is because the `MachineInfo` struct
  // holds more information than a maintenance schedule.
  // For example, the `mode` field is not part of a maintenance schedule.

  // TODO(josephw): allow more than one schedule.

  // Put the machines in the updated schedule into a set.
  // Save the unavailability, to help with updating some machines.
  hashmap<MachineID, Unavailability> unavailabilities;
  foreach (const mesos::maintenance::Window& window, schedule.windows()) {
    foreach (const MachineID& id, window.machine_ids()) {
      unavailabilities[id] = window.unavailability();
    }
  }

  // NOTE: Copies are needed because `updateUnavailability()` in this loop
  // modifies the container.
  foreachkey (const MachineID& id, utils::copy(master->machines)) {
    // Update the `unavailability` for each existing machine, except for
    // machines going from `UP` to `DRAINING` (handled in the next loop).
    // Each machine will only be touched by 1 of the 2 loops here to
    // avoid sending inverse offer twice for a single machine since
    // `updateUnavailability` will trigger an inverse offer.
    // TODO(gyliu513): Merge this logic with `Master::updateUnavailability`,
    // having it in two places results in more conditionals to handle.
    if (unavailabilities.contains(id)) {
      if (master->machines[id].info.mode() == MachineInfo::UP) {
        continue;
      }

      master->updateUnavailability(id, unavailabilities[id]);
      continue;
    }

    // Transition each removed machine back to the `UP` mode and remove the
    // unavailability.
    master->machines[id].info.set_mode(MachineInfo::UP);
    master->updateUnavailability(id, None());
  }

  // Save each new machine, with the unavailability
  // and starting in `DRAINING` mode.
  foreach (const mesos::maintenance::Window& window, schedule.windows()) {
    foreach (const MachineID& id, window.machine_ids()) {
      if (master->machines.contains(id) &&
          master->machines[id].info.mode() != MachineInfo::UP) {
        continue;
      }

      MachineInfo info;
      info.mutable_id()->CopyFrom(id);
      info.set_mode(MachineInfo::DRAINING);

      master->machines[id].info.CopyFrom(info);

      master->updateUnavailability(id, window.unavailability());
    }
  }

  // Replace the old schedule(s) with the new schedule.
  master->maintenance.schedules.clear();
  master->maintenance.schedules.push_back(schedule);

  return OK();
}


Future<Response> Master::Http::getMaintenanceSchedule(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType contentType) const
{
  CHECK_EQ(mesos::master::Call::GET_MAINTENANCE_SCHEDULE, call.type());

  Future<Owned<ObjectApprover>> approver;

  if (master->authorizer.isSome()) {
    Option<authorization::Subject> subject = createSubject(principal);

    approver = master->authorizer.get()->getObjectApprover(
        subject, authorization::GET_MAINTENANCE_SCHEDULE);
  } else {
    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
  }

  return approver.then(defer(
      master->self(),
      [this, contentType](
          const Owned<ObjectApprover>& approver) -> Future<Response> {
        mesos::master::Response response;

        response.set_type(mesos::master::Response::GET_MAINTENANCE_SCHEDULE);

        response.mutable_get_maintenance_schedule()->mutable_schedule()
          ->CopyFrom(_getMaintenanceSchedule(approver));

        return OK(serialize(contentType, evolve(response)),
                  stringify(contentType));
      }));
}


Future<Response> Master::Http::updateMaintenanceSchedule(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType /*contentType*/) const
{
  CHECK_EQ(mesos::master::Call::UPDATE_MAINTENANCE_SCHEDULE, call.type());
  CHECK(call.has_update_maintenance_schedule());

  mesos::maintenance::Schedule schedule =
    call.update_maintenance_schedule().schedule();

  return _updateMaintenanceSchedule(schedule, principal);
}


// /master/machine/down endpoint help.
string Master::Http::MACHINE_DOWN_HELP()
{
  return HELP(
    TLDR(
        "Brings a set of machines down."),
    DESCRIPTION(
        "Returns 200 OK when the operation was successful.",
        "",
        "Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
        "current master is not the leader.",
        "",
        "Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
        "found.",
        "",
        "POST: Validates the request body as JSON and transitions",
        "  the list of machines into DOWN mode.  Currently, only",
        "  machines in DRAINING mode are allowed to be brought down."),
    AUTHENTICATION(true),
    AUTHORIZATION(
        "The current principal must be allowed to bring down all the machines",
        "in the request, otherwise the request will fail."));
}


// /master/machine/down endpoint handler.
Future<Response> Master::Http::machineDown(
    const Request& request,
    const Option<Principal>& principal) const
{
  // When current master is not the leader, redirect to the leading master.
  if (!master->elected()) {
    return redirect(request);
  }

  if (request.method != "POST") {
    return MethodNotAllowed({"POST"}, request.method);
  }

  // Parse the POST body as JSON.
  Try<JSON::Array> jsonIds = JSON::parse<JSON::Array>(request.body);
  if (jsonIds.isError()) {
    return BadRequest(jsonIds.error());
  }

  // Convert the machines to a protobuf.
  auto ids = ::protobuf::parse<RepeatedPtrField<MachineID>>(jsonIds.get());
  if (ids.isError()) {
    return BadRequest(ids.error());
  }

  Future<Owned<ObjectApprover>> approver;

  if (master->authorizer.isSome()) {
    Option<authorization::Subject> subject = createSubject(principal);

    approver = master->authorizer.get()->getObjectApprover(
        subject, authorization::START_MAINTENANCE);
  } else {
    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
  }

  return approver.then(defer(
      master->self(),
      [this, ids](const Owned<ObjectApprover>& approver) {
        return _startMaintenance(ids.get(), approver);
      }));
}


Future<Response> Master::Http::_startMaintenance(
    const RepeatedPtrField<MachineID>& machineIds,
    const Owned<ObjectApprover>& approver) const
{
  // Validate every machine in the list.
  Try<Nothing> isValid = maintenance::validation::machines(machineIds);
  if (isValid.isError()) {
    return BadRequest(isValid.error());
  }

  // Check that all machines are part of a maintenance schedule.
  // TODO(josephw): Allow a transition from `UP` to `DOWN`.
  foreach (const MachineID& id, machineIds) {
    if (!master->machines.contains(id)) {
      return BadRequest(
          "Machine '" + stringify(JSON::protobuf(id)) +
            "' is not part of a maintenance schedule");
    }

    if (master->machines[id].info.mode() != MachineInfo::DRAINING) {
      return BadRequest(
          "Machine '" + stringify(JSON::protobuf(id)) +
            "' is not in DRAINING mode and cannot be brought down");
    }

    Try<bool> approved = approver->approved(ObjectApprover::Object(id));

    if (approved.isError()) {
      return InternalServerError("Authorization error: " + approved.error());
    } else if (!approved.get()) {
      return Forbidden();
    }
  }

  return master->registrar->apply(Owned<Operation>(
      new maintenance::StartMaintenance(machineIds)))
    .then(defer(master->self(), [=](bool result) -> Future<Response> {
      // See the top comment in "master/maintenance.hpp" for why this check
      // is here, and is appropriate.
      CHECK(result);

      // We currently send a `ShutdownMessage` to each slave. This terminates
      // all the executors for all the frameworks running on that slave.
      // We also manually remove the slave to force sending TASK_LOST updates
      // for all the tasks that were running on the slave and `LostSlaveMessage`
      // messages to the framework. This guards against the slave having dropped
      // the `ShutdownMessage`.
      foreach (const MachineID& machineId, machineIds) {
        // The machine may not be in machines. This means no slaves are
        // currently registered on that machine so this is a no-op.
        if (master->machines.contains(machineId)) {
          // NOTE: Copies are needed because removeSlave modifies
          // master->machines.
          foreach (
              const SlaveID& slaveId,
              utils::copy(master->machines[machineId].slaves)) {
            Slave* slave = master->slaves.registered.get(slaveId);
            CHECK_NOTNULL(slave);

            // Tell the slave to shut down.
            ShutdownMessage shutdownMessage;
            shutdownMessage.set_message("Operator initiated 'Machine DOWN'");
            master->send(slave->pid, shutdownMessage);

            // Immediately remove the slave to force sending `TASK_LOST` status
            // updates as well as `LostSlaveMessage` messages to the frameworks.
            // See comment above.
            master->removeSlave(slave, "Operator initiated 'Machine DOWN'");
          }
        }
      }

      // Update the master's local state with the downed machines.
      foreach (const MachineID& id, machineIds) {
        master->machines[id].info.set_mode(MachineInfo::DOWN);
      }

      return OK();
    }));
}


Future<Response> Master::Http::startMaintenance(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType /*contentType*/) const
{
  CHECK_EQ(mesos::master::Call::START_MAINTENANCE, call.type());
  CHECK(call.has_start_maintenance());

  Future<Owned<ObjectApprover>> approver;

  if (master->authorizer.isSome()) {
    Option<authorization::Subject> subject = createSubject(principal);

    approver = master->authorizer.get()->getObjectApprover(
        subject, authorization::START_MAINTENANCE);
  } else {
    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
  }

  RepeatedPtrField<MachineID> machineIds = call.start_maintenance().machines();

  return approver.then(defer(
      master->self(),
      [this, machineIds](const Owned<ObjectApprover>& approver) {
        return _startMaintenance(machineIds, approver);
      }));
}


// /master/machine/up endpoint help.
string Master::Http::MACHINE_UP_HELP()
{
  return HELP(
    TLDR(
        "Brings a set of machines back up."),
    DESCRIPTION(
        "Returns 200 OK when the operation was successful.",
        "",
        "Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
        "current master is not the leader.",
        "",
        "Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
        "found.",
        "",
        "POST: Validates the request body as JSON and transitions",
        "  the list of machines into UP mode.  This also removes",
        "  the list of machines from the maintenance schedule."),
    AUTHENTICATION(true),
    AUTHORIZATION(
        "The current principal must be allowed to bring up all the machines",
        "in the request, otherwise the request will fail."));
}


// /master/machine/up endpoint handler.
Future<Response> Master::Http::machineUp(
    const Request& request,
    const Option<Principal>& principal) const
{
  // When current master is not the leader, redirect to the leading master.
  if (!master->elected()) {
    return redirect(request);
  }

  if (request.method != "POST") {
    return MethodNotAllowed({"POST"}, request.method);
  }

  // Parse the POST body as JSON.
  Try<JSON::Array> jsonIds = JSON::parse<JSON::Array>(request.body);
  if (jsonIds.isError()) {
    return BadRequest(jsonIds.error());
  }

  // Convert the machines to a protobuf.
  auto ids = ::protobuf::parse<RepeatedPtrField<MachineID>>(jsonIds.get());
  if (ids.isError()) {
    return BadRequest(ids.error());
  }

  Future<Owned<ObjectApprover>> approver;

  if (master->authorizer.isSome()) {
    Option<authorization::Subject> subject = createSubject(principal);

    approver = master->authorizer.get()->getObjectApprover(
        subject, authorization::STOP_MAINTENANCE);
  } else {
    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
  }

  return approver.then(defer(
      master->self(),
      [this, ids](const Owned<ObjectApprover>& approver) {
        return _stopMaintenance(ids.get(), approver);
      }));
}


Future<Response> Master::Http::_stopMaintenance(
    const RepeatedPtrField<MachineID>& machineIds,
    const Owned<ObjectApprover>& approver) const
{
  // Validate every machine in the list.
  Try<Nothing> isValid = maintenance::validation::machines(machineIds);
  if (isValid.isError()) {
    return BadRequest(isValid.error());
  }

  // Check that all machines are part of a maintenance schedule.
  foreach (const MachineID& id, machineIds) {
    if (!master->machines.contains(id)) {
      return BadRequest(
          "Machine '" + stringify(JSON::protobuf(id)) +
            "' is not part of a maintenance schedule");
    }

    if (master->machines[id].info.mode() != MachineInfo::DOWN) {
      return BadRequest(
          "Machine '" + stringify(JSON::protobuf(id)) +
            "' is not in DOWN mode and cannot be brought up");
    }

    Try<bool> approved = approver->approved(ObjectApprover::Object(id));

    if (approved.isError()) {
      return InternalServerError("Authorization error: " + approved.error());
    } else if (!approved.get()) {
      return Forbidden();
    }
  }

  return master->registrar->apply(Owned<Operation>(
      new maintenance::StopMaintenance(machineIds)))
    .then(defer(master->self(), [=](bool result) -> Future<Response> {
      // See the top comment in "master/maintenance.hpp" for why this check
      // is here, and is appropriate.
      CHECK(result);

      // Update the master's local state with the reactivated machines.
      hashset<MachineID> updated;
      foreach (const MachineID& id, machineIds) {
        master->machines[id].info.set_mode(MachineInfo::UP);
        master->machines[id].info.clear_unavailability();
        updated.insert(id);
      }

      // Delete the machines from the schedule.
      for (list<mesos::maintenance::Schedule>::iterator schedule =
          master->maintenance.schedules.begin();
          schedule != master->maintenance.schedules.end();) {
        for (int j = schedule->windows().size() - 1; j >= 0; j--) {
          mesos::maintenance::Window* window = schedule->mutable_windows(j);

          // Delete individual machines.
          for (int k = window->machine_ids().size() - 1; k >= 0; k--) {
            if (updated.contains(window->machine_ids(k))) {
              window->mutable_machine_ids()->DeleteSubrange(k, 1);
            }
          }

          // If the resulting window is empty, delete it.
          if (window->machine_ids().size() == 0) {
            schedule->mutable_windows()->DeleteSubrange(j, 1);
          }
        }

        // If the resulting schedule is empty, delete it.
        if (schedule->windows().size() == 0) {
          schedule = master->maintenance.schedules.erase(schedule);
        } else {
          ++schedule;
        }
      }

      return OK();
    }));
}


Future<Response> Master::Http::stopMaintenance(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType /*contentType*/) const
{
  CHECK_EQ(mesos::master::Call::STOP_MAINTENANCE, call.type());
  CHECK(call.has_stop_maintenance());

  RepeatedPtrField<MachineID> machineIds = call.stop_maintenance().machines();


  Future<Owned<ObjectApprover>> approver;

  if (master->authorizer.isSome()) {
    Option<authorization::Subject> subject = createSubject(principal);

    approver = master->authorizer.get()->getObjectApprover(
        subject, authorization::STOP_MAINTENANCE);
  } else {
    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
  }

  return approver.then(defer(
      master->self(),
      [this, machineIds](const Owned<ObjectApprover>& approver) {
        return _stopMaintenance(machineIds, approver);
      }));
}


// /master/maintenance/status endpoint help.
string Master::Http::MAINTENANCE_STATUS_HELP()
{
  return HELP(
    TLDR(
        "Retrieves the maintenance status of the cluster."),
    DESCRIPTION(
        "Returns 200 OK when the maintenance status was queried successfully.",
        "",
        "Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
        "current master is not the leader.",
        "",
        "Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
        "found.",
        "",
        "Returns an object with one list of machines per machine mode.",
        "For draining machines, this list includes the frameworks' responses",
        "to inverse offers.",
        "**NOTE**:",
        "Inverse offer responses are cleared if the master fails over.",
        "However, new inverse offers will be sent once the master recovers."),
    AUTHENTICATION(true),
    AUTHORIZATION(
        "The response will contain only the maintenance status for those",
        "machines the current principal is allowed to see. If none, an empty",
        "response will be returned."));
}


// /master/maintenance/status endpoint handler.
Future<Response> Master::Http::maintenanceStatus(
    const Request& request,
    const Option<Principal>& principal) const
{
  // When current master is not the leader, redirect to the leading master.
  if (!master->elected()) {
    return redirect(request);
  }

  if (request.method != "GET") {
    return MethodNotAllowed({"GET"}, request.method);
  }

  Future<Owned<ObjectApprover>> approver;

  if (master->authorizer.isSome()) {
    Option<authorization::Subject> subject = createSubject(principal);

    approver = master->authorizer.get()->getObjectApprover(
        subject, authorization::GET_MAINTENANCE_STATUS);
  } else {
    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
  }

  Option<string> jsonp = request.url.query.get("jsonp");

  return approver
    .then(defer(
      master->self(),
      [this](const Owned<ObjectApprover>& approver) {
        return _getMaintenanceStatus(approver);
      }))
    .then([jsonp](const mesos::maintenance::ClusterStatus& status) -> Response {
      return OK(JSON::protobuf(status), jsonp);
    });
}


Future<mesos::maintenance::ClusterStatus> Master::Http::_getMaintenanceStatus(
    const Owned<ObjectApprover>& approver) const
{
  return master->allocator->getInverseOfferStatuses()
    .then(defer(
        master->self(),
        [=](hashmap<
            SlaveID,
            hashmap<FrameworkID, mesos::allocator::InverseOfferStatus>> result)
          -> Future<mesos::maintenance::ClusterStatus> {
    // Unwrap the master's machine information into two arrays of machines.
    // The data is coming from the allocator and therefore could be stale.
    // Also, if the master fails over, this data is cleared.
    mesos::maintenance::ClusterStatus status;
    foreachpair (
        const MachineID& id,
        const Machine& machine,
        master->machines) {
      Try<bool> approved = approver->approved(ObjectApprover::Object(id));

      if (approved.isError()) {
        LOG(WARNING) << "Error during MachineID authorization: "
                     << approved.error();
        // TODO(arojas): Consider exposing these errors to the caller.
        continue;
      }

      if (!approved.get()) {
        continue;
      }

      switch (machine.info.mode()) {
        case MachineInfo::DRAINING: {
          mesos::maintenance::ClusterStatus::DrainingMachine* drainingMachine =
            status.add_draining_machines();

          drainingMachine->mutable_id()->CopyFrom(id);

          // Unwrap inverse offer status information from the allocator.
          foreach (const SlaveID& slave, machine.slaves) {
            if (result.contains(slave)) {
              foreachvalue (
                  const mesos::allocator::InverseOfferStatus& status,
                  result[slave]) {
                drainingMachine->add_statuses()->CopyFrom(status);
              }
            }
          }
          break;
        }

        case MachineInfo::DOWN: {
          status.add_down_machines()->CopyFrom(id);
          break;
        }

        // Currently, `UP` machines are not specifically tracked in the master.
        case MachineInfo::UP: {}
        default: {
          break;
        }
      }
    }

    return status;
  }));
}


Future<Response> Master::Http::getMaintenanceStatus(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType contentType) const
{
  CHECK_EQ(mesos::master::Call::GET_MAINTENANCE_STATUS, call.type());

  Future<Owned<ObjectApprover>> approver;

  if (master->authorizer.isSome()) {
    Option<authorization::Subject> subject = createSubject(principal);

    approver = master->authorizer.get()->getObjectApprover(
        subject, authorization::GET_MAINTENANCE_STATUS);
  } else {
    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
  }

  return approver
    .then(defer(
      master->self(),
      [this](const Owned<ObjectApprover>& approver) {
        return _getMaintenanceStatus(approver);
      }))
    .then([contentType](const mesos::maintenance::ClusterStatus& status)
          -> Response {
        mesos::master::Response response;
        response.set_type(mesos::master::Response::GET_MAINTENANCE_STATUS);
        response.mutable_get_maintenance_status()->mutable_status()
            ->CopyFrom(status);

        return OK(serialize(contentType, evolve(response)),
                  stringify(contentType));
      });
}


string Master::Http::UNRESERVE_HELP()
{
  return HELP(
    TLDR(
        "Unreserve resources dynamically on a specific agent."),
    DESCRIPTION(
        "Returns 202 ACCEPTED which indicates that the unreserve",
        "operation has been validated successfully by the master.",
        "",
        "Returns 307 TEMPORARY_REDIRECT redirect to the leading master when",
        "current master is not the leader.",
        "",
        "Returns 503 SERVICE_UNAVAILABLE if the leading master cannot be",
        "found.",
        "",
        "The request is then forwarded asynchronously to the Mesos",
        "agent where the reserved resources are located.",
        "That asynchronous message may not be delivered or",
        "unreserving resources at the agent might fail.",
        "",
        "Please provide \"slaveId\" and \"resources\" values describing",
        "the resources to be unreserved."),
    AUTHENTICATION(true),
    AUTHORIZATION(
        "Using this endpoint to unreserve resources requires that the",
        "current principal is authorized to unreserve resources created",
        "by the principal who reserved the resources.",
        "See the authorization documentation for details."));
}


Future<Response> Master::Http::unreserve(
    const Request& request,
    const Option<Principal>& principal) const
{
  // TODO(greggomann): Remove this check once the `Principal` type is used in
  // `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
  // See MESOS-7202.
  if (principal.isSome() && principal->value.isNone()) {
    return Forbidden(
        "The request's authenticated principal contains claims, but no value "
        "string. The master currently requires that principals have a value");
  }

  // When current master is not the leader, redirect to the leading master.
  if (!master->elected()) {
    return redirect(request);
  }

  if (request.method != "POST") {
    return MethodNotAllowed({"POST"}, request.method);
  }

  // Parse the query string in the request body.
  Try<hashmap<string, string>> decode =
    process::http::query::decode(request.body);

  if (decode.isError()) {
    return BadRequest("Unable to decode query string: " + decode.error());
  }

  const hashmap<string, string>& values = decode.get();

  Option<string> value;

  value = values.get("slaveId");
  if (value.isNone()) {
    return BadRequest("Missing 'slaveId' query parameter in the request body");
  }

  SlaveID slaveId;
  slaveId.set_value(value.get());

  value = values.get("resources");
  if (value.isNone()) {
    return BadRequest(
        "Missing 'resources' query parameter in the request body");
  }

  Try<JSON::Array> parse =
    JSON::parse<JSON::Array>(value.get());

  if (parse.isError()) {
    return BadRequest(
        "Error in parsing 'resources' query parameter in the request body: " +
        parse.error());
  }

  RepeatedPtrField<Resource> resources;
  foreach (const JSON::Value& value, parse.get().values) {
    Try<Resource> resource = ::protobuf::parse<Resource>(value);
    if (resource.isError()) {
      return BadRequest(
          "Error in parsing 'resources' query parameter in the request body: " +
          resource.error());
    }

    resources.Add()->CopyFrom(resource.get());
  }

  return _unreserve(slaveId, resources, principal);
}


Future<Response> Master::Http::_unreserve(
    const SlaveID& slaveId,
    const RepeatedPtrField<Resource>& resources,
    const Option<Principal>& principal) const
{
  Slave* slave = master->slaves.registered.get(slaveId);
  if (slave == nullptr) {
    return BadRequest("No agent found with specified ID");
  }

  // Create an offer operation.
  Offer::Operation operation;
  operation.set_type(Offer::Operation::UNRESERVE);
  operation.mutable_unreserve()->mutable_resources()->CopyFrom(resources);

  Option<Error> error = validateAndNormalizeResources(&operation);
  if (error.isSome()) {
    return BadRequest(error->message);
  }

  error = validation::operation::validate(operation.unreserve());
  if (error.isSome()) {
    return BadRequest("Invalid UNRESERVE operation: " + error->message);
  }

  return master->authorizeUnreserveResources(operation.unreserve(), principal)
    .then(defer(master->self(), [=](bool authorized) -> Future<Response> {
      if (!authorized) {
        return Forbidden();
      }

      return _operation(slaveId, operation.unreserve().resources(), operation);
    }));
}


Future<Response> Master::Http::_operation(
    const SlaveID& slaveId,
    Resources required,
    const Offer::Operation& operation) const
{
  Slave* slave = master->slaves.registered.get(slaveId);
  if (slave == nullptr) {
    return BadRequest("No agent found with specified ID");
  }

  // The resources recovered by rescinding outstanding offers.
  Resources totalRecovered;

  // We pessimistically assume that what seems like "available"
  // resources in the allocator will be gone. This can happen due to
  // the race between the allocator scheduling an 'allocate' call to
  // itself vs master's request to schedule 'updateAvailable'.
  // We greedily rescind one offer at time until we've rescinded
  // enough offers to cover 'operation'.
  foreach (Offer* offer, utils::copy(slave->offers)) {
    // If rescinding the offer would not contribute to satisfying
    // the required resources, skip it.
    Resources recovered = offer->resources();
    recovered.unallocate();

    if (required == required - recovered) {
      continue;
    }

    totalRecovered += recovered;
    required -= recovered;

    // We explicitly pass 'Filters()' which has a default 'refuse_seconds'
    // of 5 seconds rather than 'None()' here, so that we can virtually
    // always win the race against 'allocate' if these resources are to
    // be offered back to these frameworks.
    // NOTE: However it's entirely possible that these resources are
    // offered to other frameworks in the next 'allocate' and the filter
    // cannot prevent it.
    master->allocator->recoverResources(
        offer->framework_id(),
        offer->slave_id(),
        offer->resources(),
        Filters());

    master->removeOffer(offer, true); // Rescind!

    // If we've rescinded enough offers to cover 'operation', we're done.
    Try<Resources> updatedRecovered = totalRecovered.apply(operation);
    if (updatedRecovered.isSome()) {
      break;
    }
  }

  // Propagate the 'Future<Nothing>' as 'Future<Response>' where
  // 'Nothing' -> 'Accepted' and Failed -> 'Conflict'.
  return master->apply(slave, operation)
    .then([]() -> Response { return Accepted(); })
    .repair([](const Future<Response>& result) {
       return Conflict(result.failure());
    });
}


Future<Response> Master::Http::unreserveResources(
    const mesos::master::Call& call,
    const Option<Principal>& principal,
    ContentType contentType) const
{
  CHECK_EQ(mesos::master::Call::UNRESERVE_RESOURCES, call.type());

  const SlaveID& slaveId = call.unreserve_resources().slave_id();
  const RepeatedPtrField<Resource>& resources =
    call.unreserve_resources().resources();

  return _unreserve(slaveId, resources, principal);
}

} // namespace master {
} // namespace internal {
} // namespace mesos {
