blob: b2bc2ae242db630217ab5065fb1ddf318428329b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <iomanip>
#include <map>
#include <memory>
#include <set>
#include <sstream>
#include <string>
#include <utility>
#include <vector>
#include <boost/array.hpp>
#include <mesos/attributes.hpp>
#include <mesos/type_utils.hpp>
#include <mesos/authorizer/authorizer.hpp>
#include <mesos/maintenance/maintenance.hpp>
#include <process/defer.hpp>
#include <process/help.hpp>
#include <process/metrics/metrics.hpp>
#include <stout/base64.hpp>
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/json.hpp>
#include <stout/jsonify.hpp>
#include <stout/lambda.hpp>
#include <stout/net.hpp>
#include <stout/nothing.hpp>
#include <stout/numify.hpp>
#include <stout/os.hpp>
#include <stout/protobuf.hpp>
#include <stout/representation.hpp>
#include <stout/result.hpp>
#include <stout/strings.hpp>
#include <stout/try.hpp>
#include <stout/utils.hpp>
#include <stout/uuid.hpp>
#include "common/build.hpp"
#include "common/http.hpp"
#include "common/protobuf_utils.hpp"
#include "internal/devolve.hpp"
#include "logging/logging.hpp"
#include "master/machine.hpp"
#include "master/maintenance.hpp"
#include "master/master.hpp"
#include "master/validation.hpp"
#include "mesos/mesos.hpp"
#include "mesos/resources.hpp"
using google::protobuf::RepeatedPtrField;
using process::Clock;
using process::DESCRIPTION;
using process::Future;
using process::HELP;
using process::TLDR;
using process::http::Accepted;
using process::http::BadRequest;
using process::http::Conflict;
using process::http::Forbidden;
using process::http::InternalServerError;
using process::http::MethodNotAllowed;
using process::http::NotFound;
using process::http::NotImplemented;
using process::http::NotAcceptable;
using process::http::OK;
using process::http::Pipe;
using process::http::ServiceUnavailable;
using process::http::TemporaryRedirect;
using process::http::UnsupportedMediaType;
using process::metrics::internal::MetricsProcess;
using std::list;
using std::map;
using std::set;
using std::string;
using std::vector;
namespace mesos {
void json(
JSON::StringWriter* writer, const FrameworkInfo::Capability& capability)
{
writer->append(FrameworkInfo::Capability::Type_Name(capability.type()));
}
void json(JSON::ObjectWriter* writer, const Offer& offer)
{
writer->field("id", offer.id().value());
writer->field("framework_id", offer.framework_id().value());
writer->field("slave_id", offer.slave_id().value());
writer->field("resources", Resources(offer.resources()));
}
namespace internal {
namespace master {
// Pull in model overrides from common.
using mesos::internal::model;
// Pull in definitions from process.
using process::http::Response;
using process::http::Request;
using process::Owned;
// The summary representation of `T` to support the `/state-summary` endpoint.
// e.g., `Summary<Slave>`.
template <typename T>
struct Summary : Representation<T>
{
using Representation<T>::Representation;
};
// The full representation of `T` to support the `/state` endpoint.
// e.g., `Full<Slave>`.
template <typename T>
struct Full : Representation<T>
{
using Representation<T>::Representation;
};
void json(JSON::ObjectWriter* writer, const Summary<Slave>& summary)
{
const Slave& slave = summary;
writer->field("id", slave.id.value());
writer->field("pid", string(slave.pid));
writer->field("hostname", slave.info.hostname());
writer->field("registered_time", slave.registeredTime.secs());
if (slave.reregisteredTime.isSome()) {
writer->field("reregistered_time", slave.reregisteredTime.get().secs());
}
const Resources& totalResources = slave.totalResources;
writer->field("resources", totalResources);
writer->field("used_resources", Resources::sum(slave.usedResources));
writer->field("offered_resources", slave.offeredResources);
writer->field("reserved_resources", totalResources.reserved());
writer->field("unreserved_resources", totalResources.unreserved());
writer->field("attributes", Attributes(slave.info.attributes()));
writer->field("active", slave.active);
writer->field("version", slave.version);
}
void json(JSON::ObjectWriter* writer, const Full<Slave>& full)
{
const Slave& slave = full;
json(writer, Summary<Slave>(slave));
}
void json(JSON::ObjectWriter* writer, const Summary<Framework>& summary)
{
const Framework& framework = summary;
writer->field("id", framework.id().value());
writer->field("name", framework.info.name());
// Omit pid for http frameworks.
if (framework.pid.isSome()) {
writer->field("pid", string(framework.pid.get()));
}
// TODO(bmahler): Use these in the webui.
writer->field("used_resources", framework.totalUsedResources);
writer->field("offered_resources", framework.totalOfferedResources);
writer->field("capabilities", framework.info.capabilities());
writer->field("hostname", framework.info.hostname());
writer->field("webui_url", framework.info.webui_url());
writer->field("active", framework.active);
}
void json(JSON::ObjectWriter* writer, const Full<Framework>& full)
{
const Framework& framework = full;
json(writer, Summary<Framework>(framework));
// Add additional fields to those generated by the
// `Summary<Framework>` overload.
writer->field("user", framework.info.user());
writer->field("failover_timeout", framework.info.failover_timeout());
writer->field("checkpoint", framework.info.checkpoint());
writer->field("role", framework.info.role());
writer->field("registered_time", framework.registeredTime.secs());
writer->field("unregistered_time", framework.unregisteredTime.secs());
if (framework.info.has_principal()) {
writer->field("principal", framework.info.principal());
}
// TODO(bmahler): Consider deprecating this in favor of the split
// used and offered resources added in `Summary<Framework>`.
writer->field(
"resources",
framework.totalUsedResources + framework.totalOfferedResources);
// TODO(benh): Consider making reregisteredTime an Option.
if (framework.registeredTime != framework.reregisteredTime) {
writer->field("reregistered_time", framework.reregisteredTime.secs());
}
// Model all of the tasks associated with a framework.
writer->field("tasks", [&framework](JSON::ArrayWriter* writer) {
foreachvalue (const TaskInfo& taskInfo, framework.pendingTasks) {
writer->element([&framework, &taskInfo](JSON::ObjectWriter* writer) {
writer->field("id", taskInfo.task_id().value());
writer->field("name", taskInfo.name());
writer->field("framework_id", framework.id().value());
writer->field("executor_id", taskInfo.executor().executor_id().value());
writer->field("slave_id", taskInfo.slave_id().value());
writer->field("state", TaskState_Name(TASK_STAGING));
writer->field("resources", Resources(taskInfo.resources()));
writer->field("statuses", std::initializer_list<TaskStatus>{});
if (taskInfo.has_labels()) {
writer->field("labels", taskInfo.labels());
}
if (taskInfo.has_discovery()) {
writer->field("discovery", JSON::Protobuf(taskInfo.discovery()));
}
if (taskInfo.has_container()) {
writer->field("container", JSON::Protobuf(taskInfo.container()));
}
});
}
foreachvalue (Task* task, framework.tasks) {
writer->element(*task);
}
});
writer->field("completed_tasks", [&framework](JSON::ArrayWriter* writer) {
foreach (const std::shared_ptr<Task>& task, framework.completedTasks) {
writer->element(*task);
}
});
// Model all of the offers associated with a framework.
writer->field("offers", [&framework](JSON::ArrayWriter* writer) {
foreach (Offer* offer, framework.offers) {
writer->element(*offer);
}
});
// Model all of the executors of a framework.
writer->field("executors", [&framework](JSON::ArrayWriter* writer) {
foreachpair (
const SlaveID& slaveId, const auto& executorsMap, framework.executors) {
foreachvalue (const ExecutorInfo& executor, executorsMap) {
writer->element([&executor, &slaveId](JSON::ObjectWriter* writer) {
json(writer, executor);
writer->field("slave_id", slaveId.value());
});
}
}
});
// Model all of the labels associated with a framework.
if (framework.info.has_labels()) {
writer->field("labels", framework.info.labels());
}
}
void Master::Http::log(const Request& request)
{
Option<string> userAgent = request.headers.get("User-Agent");
Option<string> forwardedFor = request.headers.get("X-Forwarded-For");
LOG(INFO) << "HTTP " << request.method << " for " << request.url.path
<< " from " << request.client
<< (userAgent.isSome()
? " with User-Agent='" + userAgent.get() + "'"
: "")
<< (forwardedFor.isSome()
? " with X-Forwarded-For='" + forwardedFor.get() + "'"
: "");
}
// TODO(ijimenez): Add some information or pointers to help
// users understand the HTTP Event/Call API.
string Master::Http::SCHEDULER_HELP()
{
return HELP(
TLDR(
"Endpoint for schedulers to make Calls against the master."),
DESCRIPTION(
"Returns 202 Accepted iff the request is accepted."));
}
Future<Response> Master::Http::scheduler(const Request& request) const
{
// TODO(vinod): Add metrics for rejected requests.
// TODO(vinod): Add support for rate limiting.
if (!master->elected()) {
// Note that this could happen if the scheduler realizes this is the
// leading master before master itself realizes it (e.g., ZK watch delay).
if (master->leader.isNone()) {
return ServiceUnavailable("No leader elected");
} else {
return redirect(request);
}
}
CHECK_SOME(master->recovered);
if (!master->recovered.get().isReady()) {
return ServiceUnavailable("Master has not finished recovery");
}
if (master->flags.authenticate_frameworks) {
return Forbidden(
"HTTP schedulers are not supported when authentication is required");
}
if (request.method != "POST") {
return MethodNotAllowed(
{"POST"}, "Expecting 'POST', received '" + request.method + "'");
}
v1::scheduler::Call v1Call;
// TODO(anand): Content type values are case-insensitive.
Option<string> contentType = request.headers.get("Content-Type");
if (contentType.isNone()) {
return BadRequest("Expecting 'Content-Type' to be present");
}
if (contentType.get() == APPLICATION_PROTOBUF) {
if (!v1Call.ParseFromString(request.body)) {
return BadRequest("Failed to parse body into Call protobuf");
}
} else if (contentType.get() == APPLICATION_JSON) {
Try<JSON::Value> value = JSON::parse(request.body);
if (value.isError()) {
return BadRequest("Failed to parse body into JSON: " + value.error());
}
Try<v1::scheduler::Call> parse =
::protobuf::parse<v1::scheduler::Call>(value.get());
if (parse.isError()) {
return BadRequest("Failed to convert JSON into Call protobuf: " +
parse.error());
}
v1Call = parse.get();
} else {
return UnsupportedMediaType(
string("Expecting 'Content-Type' of ") +
APPLICATION_JSON + " or " + APPLICATION_PROTOBUF);
}
scheduler::Call call = devolve(v1Call);
Option<Error> error = validation::scheduler::call::validate(call);
if (error.isSome()) {
return BadRequest("Failed to validate Scheduler::Call: " +
error.get().message);
}
if (call.type() == scheduler::Call::SUBSCRIBE) {
// We default to JSON since an empty 'Accept' header
// results in all media types considered acceptable.
ContentType responseContentType;
if (request.acceptsMediaType(APPLICATION_JSON)) {
responseContentType = ContentType::JSON;
} else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
responseContentType = ContentType::PROTOBUF;
} else {
return NotAcceptable(
string("Expecting 'Accept' to allow ") +
"'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
}
// Make sure that a stream ID was not included in the request headers.
if (request.headers.contains("Mesos-Stream-Id")) {
return BadRequest(
"Subscribe calls should not include the 'Mesos-Stream-Id' header");
}
Pipe pipe;
OK ok;
ok.headers["Content-Type"] = stringify(responseContentType);
ok.type = Response::PIPE;
ok.reader = pipe.reader();
// Generate a stream ID and return it in the response.
UUID streamId = UUID::random();
ok.headers["Mesos-Stream-Id"] = streamId.toString();
HttpConnection http {pipe.writer(), responseContentType, streamId};
master->subscribe(http, call.subscribe());
return ok;
}
// We consolidate the framework lookup logic here because it is
// common for all the call handlers.
Framework* framework = master->getFramework(call.framework_id());
if (framework == NULL) {
return BadRequest("Framework cannot be found");
}
if (!framework->connected) {
return Forbidden("Framework is not subscribed");
}
if (framework->http.isNone()) {
return Forbidden("Framework is not connected via HTTP");
}
// This isn't a `SUBSCRIBE` call, so the request should include a stream ID.
if (!request.headers.contains("Mesos-Stream-Id")) {
return BadRequest(
"All non-subscribe calls should include the 'Mesos-Stream-Id' header");
}
if (request.headers.at("Mesos-Stream-Id") !=
framework->http.get().streamId.toString()) {
return BadRequest(
"The stream ID included in this request didn't match the stream ID "
"currently associated with framework ID '"
+ framework->id().value() + "'");
}
switch (call.type()) {
case scheduler::Call::TEARDOWN:
master->removeFramework(framework);
return Accepted();
case scheduler::Call::ACCEPT:
master->accept(framework, call.accept());
return Accepted();
case scheduler::Call::DECLINE:
master->decline(framework, call.decline());
return Accepted();
case scheduler::Call::REVIVE:
master->revive(framework);
return Accepted();
case scheduler::Call::SUPPRESS:
master->suppress(framework);
return Accepted();
case scheduler::Call::KILL:
master->kill(framework, call.kill());
return Accepted();
case scheduler::Call::SHUTDOWN:
master->shutdown(framework, call.shutdown());
return Accepted();
case scheduler::Call::ACKNOWLEDGE:
master->acknowledge(framework, call.acknowledge());
return Accepted();
case scheduler::Call::RECONCILE:
master->reconcile(framework, call.reconcile());
return Accepted();
case scheduler::Call::MESSAGE:
master->message(framework, call.message());
return Accepted();
case scheduler::Call::REQUEST:
master->request(framework, call.request());
return Accepted();
default:
// Should be caught during call validation above.
LOG(FATAL) << "Unexpected " << call.type() << " call";
}
return NotImplemented();
}
string Master::Http::CREATE_VOLUMES_HELP()
{
return HELP(
TLDR(
"Create persistent volumes on reserved resources."),
DESCRIPTION(
"Returns 200 OK if the request was accepted. This does not",
"imply that the volume was created successfully: volume",
"creation is done asynchronously and may fail.",
"",
"Please provide \"slaveId\" and \"volumes\" values designating",
"the volumes to be created."));
}
static Resources removeDiskInfos(const Resources& resources)
{
Resources result;
foreach (Resource resource, resources) {
resource.clear_disk();
result += resource;
}
return result;
}
Future<Response> Master::Http::createVolumes(
const Request& request,
const Option<string>& principal) const
{
if (request.method != "POST") {
return MethodNotAllowed(
{"POST"}, "Expecting 'POST', received '" + request.method + "'");
}
// Parse the query string in the request body.
Try<hashmap<string, string>> decode =
process::http::query::decode(request.body);
if (decode.isError()) {
return BadRequest("Unable to decode query string: " + decode.error());
}
const hashmap<string, string>& values = decode.get();
if (values.get("slaveId").isNone()) {
return BadRequest("Missing 'slaveId' query parameter");
}
SlaveID slaveId;
slaveId.set_value(values.get("slaveId").get());
Slave* slave = master->slaves.registered.get(slaveId);
if (slave == NULL) {
return BadRequest("No slave found with specified ID");
}
if (values.get("volumes").isNone()) {
return BadRequest("Missing 'volumes' query parameter");
}
Try<JSON::Array> parse =
JSON::parse<JSON::Array>(values.get("volumes").get());
if (parse.isError()) {
return BadRequest(
"Error in parsing 'volumes' query parameter: " + parse.error());
}
Resources volumes;
foreach (const JSON::Value& value, parse.get().values) {
Try<Resource> volume = ::protobuf::parse<Resource>(value);
if (volume.isError()) {
return BadRequest(
"Error in parsing 'volumes' query parameter: " + volume.error());
}
volumes += volume.get();
}
// Create an offer operation.
Offer::Operation operation;
operation.set_type(Offer::Operation::CREATE);
operation.mutable_create()->mutable_volumes()->CopyFrom(volumes);
Option<Error> validate = validation::operation::validate(
operation.create(), slave->checkpointedResources);
if (validate.isSome()) {
return BadRequest("Invalid CREATE operation: " + validate.get().message);
}
return master->authorizeCreateVolume(operation.create(), principal)
.then(defer(master->self(), [=](bool authorized) -> Future<Response> {
if (!authorized) {
return Forbidden();
}
// The resources required for this operation are equivalent to the
// volumes specified by the user minus any DiskInfo (DiskInfo will
// be created when this operation is applied).
return _operation(slaveId, removeDiskInfos(volumes), operation);
}));
}
string Master::Http::DESTROY_VOLUMES_HELP()
{
return HELP(
TLDR(
"Destroy persistent volumes."),
DESCRIPTION(
"Returns 200 OK if the request was accepted. This does not",
"imply that the volume was destroyed successfully: volume",
"destruction is done asynchronously and may fail.",
"",
"Please provide \"slaveId\" and \"volumes\" values designating",
"the volumes to be destroyed."));
}
Future<Response> Master::Http::destroyVolumes(
const Request& request,
const Option<string>& principal) const
{
if (request.method != "POST") {
return MethodNotAllowed(
{"POST"}, "Expecting 'POST', received '" + request.method + "'");
}
// Parse the query string in the request body.
Try<hashmap<string, string>> decode =
process::http::query::decode(request.body);
if (decode.isError()) {
return BadRequest("Unable to decode query string: " + decode.error());
}
const hashmap<string, string>& values = decode.get();
if (values.get("slaveId").isNone()) {
return BadRequest("Missing 'slaveId' query parameter");
}
SlaveID slaveId;
slaveId.set_value(values.get("slaveId").get());
Slave* slave = master->slaves.registered.get(slaveId);
if (slave == NULL) {
return BadRequest("No slave found with specified ID");
}
if (values.get("volumes").isNone()) {
return BadRequest("Missing 'volumes' query parameter");
}
Try<JSON::Array> parse =
JSON::parse<JSON::Array>(values.get("volumes").get());
if (parse.isError()) {
return BadRequest(
"Error in parsing 'volumes' query parameter: " + parse.error());
}
Resources volumes;
foreach (const JSON::Value& value, parse.get().values) {
Try<Resource> volume = ::protobuf::parse<Resource>(value);
if (volume.isError()) {
return BadRequest(
"Error in parsing 'volumes' query parameter: " + volume.error());
}
volumes += volume.get();
}
// Create an offer operation.
Offer::Operation operation;
operation.set_type(Offer::Operation::DESTROY);
operation.mutable_destroy()->mutable_volumes()->CopyFrom(volumes);
Option<Error> validate = validation::operation::validate(
operation.destroy(), slave->checkpointedResources);
if (validate.isSome()) {
return BadRequest("Invalid DESTROY operation: " + validate.get().message);
}
return master->authorizeDestroyVolume(operation.destroy(), principal)
.then(defer(master->self(), [=](bool authorized) -> Future<Response> {
if (!authorized) {
return Forbidden();
}
return _operation(slaveId, volumes, operation);
}));
}
string Master::Http::FRAMEWORKS()
{
return HELP(TLDR("Exposes the frameworks info."));
}
Future<Response> Master::Http::frameworks(const Request& request) const
{
auto frameworks = [this](JSON::ObjectWriter* writer) {
// Model all of the frameworks.
writer->field("frameworks", [this](JSON::ArrayWriter* writer) {
foreachvalue (Framework* framework, master->frameworks.registered) {
writer->element(Full<Framework>(*framework));
}
});
// Model all of the completed frameworks.
writer->field("completed_frameworks", [this](JSON::ArrayWriter* writer) {
foreach (const std::shared_ptr<Framework>& framework,
master->frameworks.completed) {
writer->element(Full<Framework>(*framework));
}
});
// Model all currently unregistered frameworks. This can happen
// when a framework has yet to re-register after master failover.
writer->field("unregistered_frameworks", [this](JSON::ArrayWriter* writer) {
// Find unregistered frameworks.
foreachvalue (const Slave* slave, master->slaves.registered) {
foreachkey (const FrameworkID& frameworkId, slave->tasks) {
if (!master->frameworks.registered.contains(frameworkId)) {
writer->element(frameworkId.value());
}
}
}
});
};
return OK(jsonify(frameworks), request.url.query.get("jsonp"));
}
string Master::Http::FLAGS_HELP()
{
return HELP(TLDR("Exposes the master's flag configuration."));
}
Future<Response> Master::Http::flags(const Request& request) const
{
JSON::Object object;
{
JSON::Object flags;
foreachpair (const string& name, const flags::Flag& flag, master->flags) {
Option<string> value = flag.stringify(master->flags);
if (value.isSome()) {
flags.values[name] = value.get();
}
}
object.values["flags"] = std::move(flags);
}
return OK(object, request.url.query.get("jsonp"));
}
string Master::Http::HEALTH_HELP()
{
return HELP(
TLDR(
"Health check of the Master."),
DESCRIPTION(
"Returns 200 OK iff the Master is healthy.",
"Delayed responses are also indicative of poor health."));
}
Future<Response> Master::Http::health(const Request& request) const
{
return OK();
}
const static string HOSTS_KEY = "hosts";
const static string LEVEL_KEY = "level";
const static string MONITOR_KEY = "monitor";
string Master::Http::OBSERVE_HELP()
{
return HELP(
TLDR(
"Observe a monitor health state for host(s)."),
DESCRIPTION(
"This endpoint receives information indicating host(s)",
"health."
"",
"The following fields should be supplied in a POST:",
"1. " + MONITOR_KEY + " - name of the monitor that is being reported",
"2. " + HOSTS_KEY + " - comma separated list of hosts",
"3. " + LEVEL_KEY + " - OK for healthy, anything else for unhealthy"));
}
Try<string> getFormValue(
const string& key,
const hashmap<string, string>& values)
{
Option<string> value = values.get(key);
if (value.isNone()) {
return Error("Missing value for '" + key + "'");
}
// HTTP decode the value.
Try<string> decodedValue = process::http::decode(value.get());
if (decodedValue.isError()) {
return decodedValue;
}
// Treat empty string as an error.
if (decodedValue.isSome() && decodedValue.get().empty()) {
return Error("Empty string for '" + key + "'");
}
return decodedValue.get();
}
Future<Response> Master::Http::observe(const Request& request) const
{
Try<hashmap<string, string>> decode =
process::http::query::decode(request.body);
if (decode.isError()) {
return BadRequest("Unable to decode query string: " + decode.error());
}
hashmap<string, string> values = decode.get();
// Build up a JSON object of the values we received and send them back
// down the wire as JSON for validation / confirmation.
JSON::Object response;
// TODO(ccarson): As soon as RepairCoordinator is introduced it will
// consume these values. We should revisit if we still want to send the
// JSON down the wire at that point.
// Add 'monitor'.
Try<string> monitor = getFormValue(MONITOR_KEY, values);
if (monitor.isError()) {
return BadRequest(monitor.error());
}
response.values[MONITOR_KEY] = monitor.get();
// Add 'hosts'.
Try<string> hostsString = getFormValue(HOSTS_KEY, values);
if (hostsString.isError()) {
return BadRequest(hostsString.error());
}
vector<string> hosts = strings::split(hostsString.get(), ",");
JSON::Array hostArray;
hostArray.values.assign(hosts.begin(), hosts.end());
response.values[HOSTS_KEY] = hostArray;
// Add 'isHealthy'.
Try<string> level = getFormValue(LEVEL_KEY, values);
if (level.isError()) {
return BadRequest(level.error());
}
bool isHealthy = strings::upper(level.get()) == "OK";
response.values["isHealthy"] = isHealthy;
return OK(response);
}
string Master::Http::REDIRECT_HELP()
{
return HELP(
TLDR(
"Redirects to the leading Master."),
DESCRIPTION(
"This returns a 307 Temporary Redirect to the leading Master.",
"If no Master is leading (according to this Master), then the",
"Master will redirect to itself.",
"",
"**NOTES:**",
"1. This is the recommended way to bookmark the WebUI when",
"running multiple Masters.",
"2. This is broken currently \"on the cloud\" (e.g. EC2) as",
"this will attempt to redirect to the private IP address, unless",
"advertise_ip points to an externally accessible IP"));
}
Future<Response> Master::Http::redirect(const Request& request) const
{
// If there's no leader, redirect to this master's base url.
MasterInfo info = master->leader.isSome()
? master->leader.get()
: master->info_;
// NOTE: Currently, 'info.ip()' stores ip in network order, which
// should be fixed. See MESOS-1201 for details.
Try<string> hostname = info.has_hostname()
? info.hostname()
: net::getHostname(net::IP(ntohl(info.ip())));
if (hostname.isError()) {
return InternalServerError(hostname.error());
}
// NOTE: We can use a protocol-relative URL here in order to allow
// the browser (or other HTTP client) to prefix with 'http:' or
// 'https:' depending on the original request. See
// https://tools.ietf.org/html/rfc7231#section-7.1.2 as well as
// http://stackoverflow.com/questions/12436669/using-protocol-relative-uris-within-location-headers
// which discusses this.
return TemporaryRedirect(
"//" + hostname.get() + ":" + stringify(info.port()));
}
string Master::Http::RESERVE_HELP()
{
return HELP(
TLDR(
"Reserve resources dynamically on a specific slave."),
DESCRIPTION(
"Returns 200 OK if the request was accepted. This does not",
"imply that the requested resources have been reserved successfully:",
"resource reservation is done asynchronously and may fail.",
"",
"Please provide \"slaveId\" and \"resources\" values designating",
"the resources to be reserved."));
}
Future<Response> Master::Http::reserve(
const Request& request,
const Option<string>& principal) const
{
if (request.method != "POST") {
return MethodNotAllowed(
{"POST"}, "Expecting 'POST', received '" + request.method + "'");
}
// Parse the query string in the request body.
Try<hashmap<string, string>> decode =
process::http::query::decode(request.body);
if (decode.isError()) {
return BadRequest("Unable to decode query string: " + decode.error());
}
const hashmap<string, string>& values = decode.get();
if (values.get("slaveId").isNone()) {
return BadRequest("Missing 'slaveId' query parameter");
}
SlaveID slaveId;
slaveId.set_value(values.get("slaveId").get());
Slave* slave = master->slaves.registered.get(slaveId);
if (slave == NULL) {
return BadRequest("No slave found with specified ID");
}
if (values.get("resources").isNone()) {
return BadRequest("Missing 'resources' query parameter");
}
Try<JSON::Array> parse =
JSON::parse<JSON::Array>(values.get("resources").get());
if (parse.isError()) {
return BadRequest(
"Error in parsing 'resources' query parameter: " + parse.error());
}
Resources resources;
foreach (const JSON::Value& value, parse.get().values) {
Try<Resource> resource = ::protobuf::parse<Resource>(value);
if (resource.isError()) {
return BadRequest(
"Error in parsing 'resources' query parameter: " + resource.error());
}
resources += resource.get();
}
// Create an offer operation.
Offer::Operation operation;
operation.set_type(Offer::Operation::RESERVE);
operation.mutable_reserve()->mutable_resources()->CopyFrom(resources);
Option<Error> error = validation::operation::validate(
operation.reserve(), principal);
if (error.isSome()) {
return BadRequest("Invalid RESERVE operation: " + error.get().message);
}
return master->authorizeReserveResources(operation.reserve(), principal)
.then(defer(master->self(), [=](bool authorized) -> Future<Response> {
if (!authorized) {
return Forbidden();
}
// NOTE: `flatten()` is important. To make a dynamic reservation,
// we want to ensure that the required resources are available
// and unreserved; `flatten()` removes the role and
// ReservationInfo from the resources.
return _operation(slaveId, resources.flatten(), operation);
}));
}
string Master::Http::SLAVES_HELP()
{
return HELP(
TLDR(
"Information about registered slaves."),
DESCRIPTION(
"This endpoint shows information about the slaves registered in",
"this master formatted as a JSON object."));
}
Future<Response> Master::Http::slaves(const Request& request) const
{
auto slaves = [this](JSON::ObjectWriter* writer) {
writer->field("slaves", [this](JSON::ArrayWriter* writer) {
foreachvalue (const Slave* slave, master->slaves.registered) {
writer->element([&slave](JSON::ObjectWriter* writer) {
json(writer, Full<Slave>(*slave));
// Add the complete protobuf->JSON for all used, reserved,
// and offered resources. The other endpoints summarize
// resource information, which omits the details of
// reservations and persistent volumes. Full resource
// information is necessary so that operators can use the
// `/unreserve` and `/destroy-volumes` endpoints.
hashmap<string, Resources> reserved =
slave->totalResources.reserved();
writer->field(
"reserved_resources_full",
[&reserved](JSON::ObjectWriter* writer) {
foreachpair (const string& role,
const Resources& resources,
reserved) {
writer->field(role, [&resources](JSON::ArrayWriter* writer) {
foreach (const Resource& resource, resources) {
writer->element(JSON::Protobuf(resource));
}
});
}
});
Resources usedResources = Resources::sum(slave->usedResources);
writer->field(
"used_resources_full",
[&usedResources](JSON::ArrayWriter* writer) {
foreach (const Resource& resource, usedResources) {
writer->element(JSON::Protobuf(resource));
}
});
const Resources& offeredResources = slave->offeredResources;
writer->field(
"offered_resources_full",
[&offeredResources](JSON::ArrayWriter* writer) {
foreach (const Resource& resource, offeredResources) {
writer->element(JSON::Protobuf(resource));
}
});
});
};
});
};
return OK(jsonify(slaves), request.url.query.get("jsonp"));
}
string Master::Http::QUOTA_HELP()
{
return HELP(
TLDR(
"Sets quota for a role."),
DESCRIPTION(
"POST: Validates the request body as JSON",
" and sets quota for a role."));
}
Future<Response> Master::Http::quota(
const Request& request,
const Option<string>& principal) const
{
// Dispatch based on HTTP method to separate `QuotaHandler`.
if (request.method == "GET") {
return quotaHandler.status(request);
}
if (request.method == "POST") {
return quotaHandler.set(request, principal);
}
if (request.method == "DELETE") {
return quotaHandler.remove(request, principal);
}
// TODO(joerg84): Add update logic for PUT requests
// once Quota supports updates.
return MethodNotAllowed(
{"GET", "POST", "DELETE"},
"Expecting 'GET', 'POST' or 'DELETE', received '" + request.method + "'");
}
string Master::Http::STATE_HELP()
{
return HELP(
TLDR(
"Information about state of master."),
DESCRIPTION(
"This endpoint shows information about the frameworks, tasks,",
"executors and slaves running in the cluster as a JSON object.",
"",
"Example (**Note**: this is not exhaustive):",
"",
"```",
"{",
" \"version\" : \"0.28.0\",",
" \"git_sha\" : \"9d5889b5a265849886a533965f4aefefd1fbd103\",",
" \"git_branch\" : \"refs/heads/master\",",
" \"git_tag\" : \"0.28.0\",",
" \"build_date\" : \"2016-02-15 10:00:28\",",
" \"build_time\" : 1455559228,",
" \"build_user\" : \"mesos-user\",",
" \"start_time\" : 1455643643.42422,",
" \"elected_time\" : 1455643643.43457,",
" \"id\" : \"b5eac2c5-609b-4ca1-a352-61941702fc9e\",",
" \"pid\" : \"master@127.0.0.1:5050\",",
" \"hostname\" : \"localhost\",",
" \"activated_slaves\" : 0,",
" \"deactivated_slaves\" : 0,",
" \"cluster\" : \"test-cluster\",",
" \"leader\" : \"master@127.0.0.1:5050\",",
" \"log_dir\" : \"/var/log\",",
" \"external_log_file\" : \"mesos.log\",",
" \"flags\" : {",
" \"framework_sorter\" : \"drf\",",
" \"authenticate\" : \"false\",",
" \"logbufsecs\" : \"0\",",
" \"initialize_driver_logging\" : \"true\",",
" \"work_dir\" : \"/var/lib/mesos\",",
" \"http_authenticators\" : \"basic\",",
" \"authorizers\" : \"local\",",
" \"slave_reregister_timeout\" : \"10mins\",",
" \"logging_level\" : \"INFO\",",
" \"help\" : \"false\",",
" \"root_submissions\" : \"true\",",
" \"ip\" : \"127.0.0.1\",",
" \"user_sorter\" : \"drf\",",
" \"version\" : \"false\",",
" \"max_slave_ping_timeouts\" : \"5\",",
" \"slave_ping_timeout\" : \"15secs\",",
" \"registry_store_timeout\" : \"20secs\",",
" \"max_completed_frameworks\" : \"50\",",
" \"quiet\" : \"false\",",
" \"allocator\" : \"HierarchicalDRF\",",
" \"hostname_lookup\" : \"true\",",
" \"authenticators\" : \"crammd5\",",
" \"max_completed_tasks_per_framework\" : \"1000\",",
" \"registry\" : \"replicated_log\",",
" \"registry_strict\" : \"false\",",
" \"log_auto_initialize\" : \"true\",",
" \"authenticate_slaves\" : \"false\",",
" \"registry_fetch_timeout\" : \"1mins\",",
" \"allocation_interval\" : \"1secs\",",
" \"authenticate_http\" : \"false\",",
" \"port\" : \"5050\",",
" \"zk_session_timeout\" : \"10secs\",",
" \"recovery_slave_removal_limit\" : \"100%\",",
" \"webui_dir\" : \"/path/to/mesos/build/../src/webui\",",
" \"cluster\" : \"mycluster\",",
" \"leader\" : \"master@127.0.0.1:5050\",",
" \"log_dir\" : \"/var/log\",",
" \"external_log_file\" : \"mesos.log\"",
" },",
" \"slaves\" : [],",
" \"frameworks\" : [],",
" \"completed_frameworks\" : [],",
" \"orphan_tasks\" : [],",
" \"unregistered_frameworks\" : []",
"}",
"```"));
}
Future<Response> Master::Http::state(const Request& request) const
{
auto state = [this](JSON::ObjectWriter* writer) {
writer->field("version", MESOS_VERSION);
if (build::GIT_SHA.isSome()) {
writer->field("git_sha", build::GIT_SHA.get());
}
if (build::GIT_BRANCH.isSome()) {
writer->field("git_branch", build::GIT_BRANCH.get());
}
if (build::GIT_TAG.isSome()) {
writer->field("git_tag", build::GIT_TAG.get());
}
writer->field("build_date", build::DATE);
writer->field("build_time", build::TIME);
writer->field("build_user", build::USER);
writer->field("start_time", master->startTime.secs());
if (master->electedTime.isSome()) {
writer->field("elected_time", master->electedTime.get().secs());
}
writer->field("id", master->info().id());
writer->field("pid", string(master->self()));
writer->field("hostname", master->info().hostname());
writer->field("activated_slaves", master->_slaves_active());
writer->field("deactivated_slaves", master->_slaves_inactive());
if (master->flags.cluster.isSome()) {
writer->field("cluster", master->flags.cluster.get());
}
if (master->leader.isSome()) {
writer->field("leader", master->leader.get().pid());
}
if (master->flags.log_dir.isSome()) {
writer->field("log_dir", master->flags.log_dir.get());
}
if (master->flags.external_log_file.isSome()) {
writer->field("external_log_file", master->flags.external_log_file.get());
}
writer->field("flags", [this](JSON::ObjectWriter* writer) {
foreachpair (const string& name, const flags::Flag& flag, master->flags) {
Option<string> value = flag.stringify(master->flags);
if (value.isSome()) {
writer->field(name, value.get());
}
}
});
// Model all of the slaves.
writer->field("slaves", [this](JSON::ArrayWriter* writer) {
foreachvalue (Slave* slave, master->slaves.registered) {
writer->element(Full<Slave>(*slave));
}
});
// Model all of the frameworks.
writer->field("frameworks", [this](JSON::ArrayWriter* writer) {
foreachvalue (Framework* framework, master->frameworks.registered) {
writer->element(Full<Framework>(*framework));
}
});
// Model all of the completed frameworks.
writer->field("completed_frameworks", [this](JSON::ArrayWriter* writer) {
foreach (const std::shared_ptr<Framework>& framework,
master->frameworks.completed) {
writer->element(Full<Framework>(*framework));
}
});
// Model all of the orphan tasks.
writer->field("orphan_tasks", [this](JSON::ArrayWriter* writer) {
// Find those orphan tasks.
foreachvalue (const Slave* slave, master->slaves.registered) {
typedef hashmap<TaskID, Task*> TaskMap;
foreachvalue (const TaskMap& tasks, slave->tasks) {
foreachvalue (const Task* task, tasks) {
CHECK_NOTNULL(task);
if (!master->frameworks.registered.contains(task->framework_id())) {
writer->element(*task);
}
}
}
}
});
// Model all currently unregistered frameworks. This can happen
// when a framework has yet to re-register after master failover.
writer->field("unregistered_frameworks", [this](JSON::ArrayWriter* writer) {
// Find unregistered frameworks.
foreachvalue (const Slave* slave, master->slaves.registered) {
foreachkey (const FrameworkID& frameworkId, slave->tasks) {
if (!master->frameworks.registered.contains(frameworkId)) {
writer->element(frameworkId.value());
}
}
}
});
};
return OK(jsonify(state), request.url.query.get("jsonp"));
}
// This abstraction has no side-effects. It factors out computing the
// mapping from 'slaves' to 'frameworks' to answer the questions 'what
// frameworks are running on a given slave?' and 'what slaves are
// running the given framework?'.
class SlaveFrameworkMapping
{
public:
SlaveFrameworkMapping(const hashmap<FrameworkID, Framework*>& frameworks)
{
foreachpair (const FrameworkID& frameworkId,
const Framework* framework,
frameworks) {
foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
frameworksToSlaves[frameworkId].insert(taskInfo.slave_id());
slavesToFrameworks[taskInfo.slave_id()].insert(frameworkId);
}
foreachvalue (const Task* task, framework->tasks) {
frameworksToSlaves[frameworkId].insert(task->slave_id());
slavesToFrameworks[task->slave_id()].insert(frameworkId);
}
foreach (const std::shared_ptr<Task>& task, framework->completedTasks) {
frameworksToSlaves[frameworkId].insert(task->slave_id());
slavesToFrameworks[task->slave_id()].insert(frameworkId);
}
}
}
const hashset<FrameworkID>& frameworks(const SlaveID& slaveId) const
{
const auto iterator = slavesToFrameworks.find(slaveId);
return iterator != slavesToFrameworks.end() ?
iterator->second : hashset<FrameworkID>::EMPTY;
}
const hashset<SlaveID>& slaves(const FrameworkID& frameworkId) const
{
const auto iterator = frameworksToSlaves.find(frameworkId);
return iterator != frameworksToSlaves.end() ?
iterator->second : hashset<SlaveID>::EMPTY;
}
private:
hashmap<SlaveID, hashset<FrameworkID>> slavesToFrameworks;
hashmap<FrameworkID, hashset<SlaveID>> frameworksToSlaves;
};
// This abstraction has no side-effects. It factors out the accounting
// for a 'TaskState' summary. We use this to summarize 'TaskState's
// for both frameworks as well as slaves.
struct TaskStateSummary
{
// TODO(jmlvanre): Possibly clean this up as per MESOS-2694.
const static TaskStateSummary EMPTY;
TaskStateSummary()
: staging(0),
starting(0),
running(0),
killing(0),
finished(0),
killed(0),
failed(0),
lost(0),
error(0) {}
// Account for the state of the given task.
void count(const Task& task)
{
switch (task.state()) {
case TASK_STAGING: { ++staging; break; }
case TASK_STARTING: { ++starting; break; }
case TASK_RUNNING: { ++running; break; }
case TASK_KILLING: { ++killing; break; }
case TASK_FINISHED: { ++finished; break; }
case TASK_KILLED: { ++killed; break; }
case TASK_FAILED: { ++failed; break; }
case TASK_LOST: { ++lost; break; }
case TASK_ERROR: { ++error; break; }
// No default case allows for a helpful compiler error if we
// introduce a new state.
}
}
size_t staging;
size_t starting;
size_t running;
size_t killing;
size_t finished;
size_t killed;
size_t failed;
size_t lost;
size_t error;
};
const TaskStateSummary TaskStateSummary::EMPTY;
// This abstraction has no side-effects. It factors out computing the
// 'TaskState' summaries for frameworks and slaves. This answers the
// questions 'How many tasks are in each state for a given framework?'
// and 'How many tasks are in each state for a given slave?'.
class TaskStateSummaries
{
public:
TaskStateSummaries(const hashmap<FrameworkID, Framework*>& frameworks)
{
foreachpair (const FrameworkID& frameworkId,
const Framework* framework,
frameworks) {
foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
frameworkTaskSummaries[frameworkId].staging++;
slaveTaskSummaries[taskInfo.slave_id()].staging++;
}
foreachvalue (const Task* task, framework->tasks) {
frameworkTaskSummaries[frameworkId].count(*task);
slaveTaskSummaries[task->slave_id()].count(*task);
}
foreach (const std::shared_ptr<Task>& task, framework->completedTasks) {
frameworkTaskSummaries[frameworkId].count(*task);
slaveTaskSummaries[task->slave_id()].count(*task);
}
}
}
const TaskStateSummary& framework(const FrameworkID& frameworkId) const
{
const auto iterator = frameworkTaskSummaries.find(frameworkId);
return iterator != frameworkTaskSummaries.end() ?
iterator->second : TaskStateSummary::EMPTY;
}
const TaskStateSummary& slave(const SlaveID& slaveId) const
{
const auto iterator = slaveTaskSummaries.find(slaveId);
return iterator != slaveTaskSummaries.end() ?
iterator->second : TaskStateSummary::EMPTY;
}
private:
hashmap<FrameworkID, TaskStateSummary> frameworkTaskSummaries;
hashmap<SlaveID, TaskStateSummary> slaveTaskSummaries;
};
string Master::Http::STATESUMMARY_HELP()
{
return HELP(
TLDR(
"Summary of state of all tasks and registered frameworks in cluster."),
DESCRIPTION(
"This endpoint gives a summary of the state of all tasks and",
"registered frameworks in the cluster as a JSON object."));
}
Future<Response> Master::Http::stateSummary(const Request& request) const
{
auto stateSummary = [this](JSON::ObjectWriter* writer) {
writer->field("hostname", master->info().hostname());
if (master->flags.cluster.isSome()) {
writer->field("cluster", master->flags.cluster.get());
}
// We use the tasks in the 'Frameworks' struct to compute summaries
// for this endpoint. This is done 1) for consistency between the
// 'slaves' and 'frameworks' subsections below 2) because we want to
// provide summary information for frameworks that are currently
// registered 3) the frameworks keep a circular buffer of completed
// tasks that we can use to keep a limited view on the history of
// recent completed / failed tasks.
// Generate mappings from 'slave' to 'framework' and reverse.
SlaveFrameworkMapping slaveFrameworkMapping(master->frameworks.registered);
// Generate 'TaskState' summaries for all framework and slave ids.
TaskStateSummaries taskStateSummaries(master->frameworks.registered);
// Model all of the slaves.
writer->field("slaves", [this,
&slaveFrameworkMapping,
&taskStateSummaries](JSON::ArrayWriter* writer) {
foreachvalue (Slave* slave, master->slaves.registered) {
writer->element([&slave,
&slaveFrameworkMapping,
&taskStateSummaries](JSON::ObjectWriter* writer) {
json(writer, Summary<Slave>(*slave));
// Add the 'TaskState' summary for this slave.
const TaskStateSummary& summary = taskStateSummaries.slave(slave->id);
writer->field("TASK_STAGING", summary.staging);
writer->field("TASK_STARTING", summary.starting);
writer->field("TASK_RUNNING", summary.running);
writer->field("TASK_KILLING", summary.killing);
writer->field("TASK_FINISHED", summary.finished);
writer->field("TASK_KILLED", summary.killed);
writer->field("TASK_FAILED", summary.failed);
writer->field("TASK_LOST", summary.lost);
writer->field("TASK_ERROR", summary.error);
// Add the ids of all the frameworks running on this slave.
const hashset<FrameworkID>& frameworks =
slaveFrameworkMapping.frameworks(slave->id);
writer->field("framework_ids",
[&frameworks](JSON::ArrayWriter* writer) {
foreach (const FrameworkID& frameworkId, frameworks) {
writer->element(frameworkId.value());
}
});
});
}
});
// Model all of the frameworks.
writer->field("frameworks",
[this,
&slaveFrameworkMapping,
&taskStateSummaries](JSON::ArrayWriter* writer) {
foreachpair (const FrameworkID& frameworkId,
Framework* framework,
master->frameworks.registered) {
writer->element([&frameworkId,
&framework,
&slaveFrameworkMapping,
&taskStateSummaries](JSON::ObjectWriter* writer) {
json(writer, Summary<Framework>(*framework));
// Add the 'TaskState' summary for this framework.
const TaskStateSummary& summary =
taskStateSummaries.framework(frameworkId);
writer->field("TASK_STAGING", summary.staging);
writer->field("TASK_STARTING", summary.starting);
writer->field("TASK_RUNNING", summary.running);
writer->field("TASK_KILLING", summary.killing);
writer->field("TASK_FINISHED", summary.finished);
writer->field("TASK_KILLED", summary.killed);
writer->field("TASK_FAILED", summary.failed);
writer->field("TASK_LOST", summary.lost);
writer->field("TASK_ERROR", summary.error);
// Add the ids of all the slaves running this framework.
const hashset<SlaveID>& slaves =
slaveFrameworkMapping.slaves(frameworkId);
writer->field("slave_ids", [&slaves](JSON::ArrayWriter* writer) {
foreach (const SlaveID& slaveId, slaves) {
writer->element(slaveId.value());
}
});
});
}
});
};
return OK(jsonify(stateSummary), request.url.query.get("jsonp"));
}
string Master::Http::ROLES_HELP()
{
return HELP(
TLDR(
"Information about roles."),
DESCRIPTION(
"This endpoint provides information about roles as a JSON object.",
"It returns information about every role that is on the role",
"whitelist (if enabled), has one or more registered frameworks,",
"or has a non-default weight or quota. For each role, it returns",
"the weight, total allocated resources, and registered frameworks."));
}
// Returns a JSON object modeled after a role.
JSON::Object model(
const string& name,
Option<double> weight,
Option<Role*> _role)
{
JSON::Object object;
object.values["name"] = name;
if (weight.isSome()) {
object.values["weight"] = weight.get();
} else {
object.values["weight"] = 1.0; // Default weight.
}
if (_role.isNone()) {
object.values["resources"] = model(Resources());
object.values["frameworks"] = JSON::Array();
} else {
Role* role = _role.get();
object.values["resources"] = model(role->resources());
{
JSON::Array array;
foreachkey (const FrameworkID& frameworkId, role->frameworks) {
array.values.push_back(frameworkId.value());
}
object.values["frameworks"] = std::move(array);
}
}
return object;
}
Future<Response> Master::Http::roles(const Request& request) const
{
JSON::Object object;
// Compute the role names to return results for. When an explicit
// role whitelist has been configured, we use that list of names.
// When using implicit roles, the right behavior is a bit more
// subtle. There are no constraints on possible role names, so we
// instead list all the "interesting" roles: the default role ("*"),
// all roles with one or more registered frameworks, and all roles
// with a non-default weight or quota.
//
// NOTE: we use a `std::set` to store the role names to ensure a
// deterministic output order.
set<string> roleList;
if (master->roleWhitelist.isSome()) {
const hashset<string>& whitelist = master->roleWhitelist.get();
roleList.insert(whitelist.begin(), whitelist.end());
} else {
roleList.insert("*"); // Default role.
roleList.insert(
master->activeRoles.keys().begin(),
master->activeRoles.keys().end());
roleList.insert(
master->weights.keys().begin(),
master->weights.keys().end());
roleList.insert(
master->quotas.keys().begin(),
master->quotas.keys().end());
}
{
JSON::Array array;
foreach (const string& name, roleList) {
Option<double> weight = None();
if (master->weights.contains(name)) {
weight = master->weights[name];
}
Option<Role*> role = None();
if (master->activeRoles.contains(name)) {
role = master->activeRoles[name];
}
array.values.push_back(model(name, weight, role));
}
object.values["roles"] = std::move(array);
}
return OK(object, request.url.query.get("jsonp"));
}
string Master::Http::TEARDOWN_HELP()
{
return HELP(
TLDR(
"Tears down a running framework by shutting down all tasks/executors "
"and removing the framework."),
DESCRIPTION(
"Please provide a \"frameworkId\" value designating the running",
"framework to tear down.",
"Returns 200 OK if the framework was correctly teared down."));
}
Future<Response> Master::Http::teardown(
const Request& request,
const Option<string>& principal) const
{
if (request.method != "POST") {
return MethodNotAllowed(
{"POST"}, "Expecting 'POST', received '" + request.method + "'");
}
// Parse the query string in the request body (since this is a POST)
// in order to determine the framework ID to shutdown.
Try<hashmap<string, string>> decode =
process::http::query::decode(request.body);
if (decode.isError()) {
return BadRequest("Unable to decode query string: " + decode.error());
}
hashmap<string, string> values = decode.get();
if (values.get("frameworkId").isNone()) {
return BadRequest("Missing 'frameworkId' query parameter");
}
FrameworkID id;
id.set_value(values.get("frameworkId").get());
Framework* framework = master->getFramework(id);
if (framework == NULL) {
return BadRequest("No framework found with specified ID");
}
// Skip authorization if no ACLs were provided to the master.
if (master->authorizer.isNone()) {
return _teardown(id);
}
mesos::ACL::ShutdownFramework shutdown;
if (principal.isSome()) {
shutdown.mutable_principals()->add_values(principal.get());
} else {
shutdown.mutable_principals()->set_type(ACL::Entity::ANY);
}
if (framework->info.has_principal()) {
shutdown.mutable_framework_principals()->add_values(
framework->info.principal());
} else {
shutdown.mutable_framework_principals()->set_type(ACL::Entity::ANY);
}
return master->authorizer.get()->authorize(shutdown)
.then(defer(master->self(), [=](bool authorized) -> Future<Response> {
if (!authorized) {
return Forbidden();
}
return _teardown(id);
}));
}
Future<Response> Master::Http::_teardown(const FrameworkID& id) const
{
Framework* framework = master->getFramework(id);
if (framework == NULL) {
return BadRequest("No framework found with ID " + stringify(id));
}
// TODO(ijimenez): Do 'removeFramework' asynchronously.
master->removeFramework(framework);
return OK();
}
string Master::Http::TASKS_HELP()
{
return HELP(
TLDR(
"Lists tasks from all active frameworks."),
DESCRIPTION(
"Lists known tasks.",
"",
"Query parameters:",
"",
"> limit=VALUE Maximum number of tasks returned "
"(default is " + stringify(TASK_LIMIT) + ").",
"> offset=VALUE Starts task list at offset.",
"> order=(asc|desc) Ascending or descending sort order "
"(default is descending)."
""));
}
struct TaskComparator
{
static bool ascending(const Task* lhs, const Task* rhs)
{
size_t lhsSize = lhs->statuses().size();
size_t rhsSize = rhs->statuses().size();
if ((lhsSize == 0) && (rhsSize == 0)) {
return false;
}
if (lhsSize == 0) {
return true;
}
if (rhsSize == 0) {
return false;
}
return (lhs->statuses(0).timestamp() < rhs->statuses(0).timestamp());
}
static bool descending(const Task* lhs, const Task* rhs)
{
size_t lhsSize = lhs->statuses().size();
size_t rhsSize = rhs->statuses().size();
if ((lhsSize == 0) && (rhsSize == 0)) {
return false;
}
if (rhsSize == 0) {
return true;
}
if (lhsSize == 0) {
return false;
}
return (lhs->statuses(0).timestamp() > rhs->statuses(0).timestamp());
}
};
Future<Response> Master::Http::tasks(const Request& request) const
{
// Get list options (limit and offset).
Result<int> result = numify<int>(request.url.query.get("limit"));
size_t limit = result.isSome() ? result.get() : TASK_LIMIT;
result = numify<int>(request.url.query.get("offset"));
size_t offset = result.isSome() ? result.get() : 0;
// TODO(nnielsen): Currently, formatting errors in offset and/or limit
// will silently be ignored. This could be reported to the user instead.
// Construct framework list with both active and completed frameworks.
vector<const Framework*> frameworks;
foreachvalue (Framework* framework, master->frameworks.registered) {
frameworks.push_back(framework);
}
foreach (const std::shared_ptr<Framework>& framework,
master->frameworks.completed) {
frameworks.push_back(framework.get());
}
// Construct task list with both running and finished tasks.
vector<const Task*> tasks;
foreach (const Framework* framework, frameworks) {
foreachvalue (Task* task, framework->tasks) {
CHECK_NOTNULL(task);
tasks.push_back(task);
}
foreach (const std::shared_ptr<Task>& task, framework->completedTasks) {
tasks.push_back(task.get());
}
}
// Sort tasks by task status timestamp. Default order is descending.
// The earliest timestamp is chosen for comparison when multiple are present.
Option<string> order = request.url.query.get("order");
if (order.isSome() && (order.get() == "asc")) {
sort(tasks.begin(), tasks.end(), TaskComparator::ascending);
} else {
sort(tasks.begin(), tasks.end(), TaskComparator::descending);
}
JSON::Object object;
{
JSON::Array array;
size_t end = std::min(offset + limit, tasks.size());
for (size_t i = offset; i < end; i++) {
const Task* task = tasks[i];
array.values.push_back(model(*task));
}
object.values["tasks"] = std::move(array);
}
return OK(object, request.url.query.get("jsonp"));
}
// /master/maintenance/schedule endpoint help.
string Master::Http::MAINTENANCE_SCHEDULE_HELP()
{
return HELP(
TLDR(
"Returns or updates the cluster's maintenance schedule."),
DESCRIPTION(
"GET: Returns the current maintenance schedule as JSON.",
"POST: Validates the request body as JSON",
" and updates the maintenance schedule."));
}
// /master/maintenance/schedule endpoint handler.
Future<Response> Master::Http::maintenanceSchedule(const Request& request) const
{
if (request.method != "GET" && request.method != "POST") {
return MethodNotAllowed(
{"GET", "POST"},
"Expecting 'GET' or 'POST', received '" + request.method + "'");
}
// JSON-ify and return the current maintenance schedule.
if (request.method == "GET") {
// TODO(josephw): Return more than one schedule.
const mesos::maintenance::Schedule schedule =
master->maintenance.schedules.empty() ?
mesos::maintenance::Schedule() :
master->maintenance.schedules.front();
return OK(JSON::protobuf(schedule), request.url.query.get("jsonp"));
}
// Parse the POST body as JSON.
Try<JSON::Object> jsonSchedule = JSON::parse<JSON::Object>(request.body);
if (jsonSchedule.isError()) {
return BadRequest(jsonSchedule.error());
}
// Convert the schedule to a protobuf.
Try<mesos::maintenance::Schedule> protoSchedule =
::protobuf::parse<mesos::maintenance::Schedule>(jsonSchedule.get());
if (protoSchedule.isError()) {
return BadRequest(protoSchedule.error());
}
// Validate that the schedule only transitions machines between
// `UP` and `DRAINING` modes.
mesos::maintenance::Schedule schedule = protoSchedule.get();
Try<Nothing> isValid = maintenance::validation::schedule(
schedule,
master->machines);
if (isValid.isError()) {
return BadRequest(isValid.error());
}
return master->registrar->apply(Owned<Operation>(
new maintenance::UpdateSchedule(schedule)))
.then(defer(master->self(), [=](bool result) -> Future<Response> {
// See the top comment in "master/maintenance.hpp" for why this check
// is here, and is appropriate.
CHECK(result);
// Update the master's local state with the new schedule.
// NOTE: We only add or remove differences between the current schedule
// and the new schedule. This is because the `MachineInfo` struct
// holds more information than a maintenance schedule.
// For example, the `mode` field is not part of a maintenance schedule.
// TODO(josephw): allow more than one schedule.
// Put the machines in the updated schedule into a set.
// Save the unavailability, to help with updating some machines.
hashmap<MachineID, Unavailability> unavailabilities;
foreach (const mesos::maintenance::Window& window, schedule.windows()) {
foreach (const MachineID& id, window.machine_ids()) {
unavailabilities[id] = window.unavailability();
}
}
// NOTE: Copies are needed because `updateUnavailability()` in this loop
// modifies the container.
foreachkey (const MachineID& id, utils::copy(master->machines)) {
// Update the `unavailability` for each existing machine, except for
// machines going from `UP` to `DRAINING` (handled in the next loop).
// Each machine will only be touched by 1 of the 2 loops here to
// avoid sending inverse offer twice for a single machine since
// `updateUnavailability` will trigger an inverse offer.
// TODO(gyliu513): Merge this logic with `Master::updateUnavailability`,
// having it in two places results in more conditionals to handle.
if (unavailabilities.contains(id)) {
if (master->machines[id].info.mode() == MachineInfo::UP) {
continue;
}
master->updateUnavailability(id, unavailabilities[id]);
continue;
}
// Transition each removed machine back to the `UP` mode and remove the
// unavailability.
master->machines[id].info.set_mode(MachineInfo::UP);
master->updateUnavailability(id, None());
}
// Save each new machine, with the unavailability
// and starting in `DRAINING` mode.
foreach (const mesos::maintenance::Window& window, schedule.windows()) {
foreach (const MachineID& id, window.machine_ids()) {
if (master->machines.contains(id) &&
master->machines[id].info.mode() != MachineInfo::UP) {
continue;
}
MachineInfo info;
info.mutable_id()->CopyFrom(id);
info.set_mode(MachineInfo::DRAINING);
master->machines[id].info.CopyFrom(info);
master->updateUnavailability(id, window.unavailability());
}
}
// Replace the old schedule(s) with the new schedule.
master->maintenance.schedules.clear();
master->maintenance.schedules.push_back(schedule);
return OK();
}));
}
// /master/machine/down endpoint help.
string Master::Http::MACHINE_DOWN_HELP()
{
return HELP(
TLDR(
"Brings a set of machines down."),
DESCRIPTION(
"POST: Validates the request body as JSON and transitions",
" the list of machines into DOWN mode. Currently, only",
" machines in DRAINING mode are allowed to be brought down."));
}
// /master/machine/down endpoint handler.
Future<Response> Master::Http::machineDown(const Request& request) const
{
if (request.method != "POST") {
return MethodNotAllowed(
{"POST"}, "Expecting 'POST', received '" + request.method + "'");
}
// Parse the POST body as JSON.
Try<JSON::Array> jsonIds = JSON::parse<JSON::Array>(request.body);
if (jsonIds.isError()) {
return BadRequest(jsonIds.error());
}
// Convert the machines to a protobuf.
auto ids = ::protobuf::parse<RepeatedPtrField<MachineID>>(jsonIds.get());
if (ids.isError()) {
return BadRequest(ids.error());
}
// Validate every machine in the list.
Try<Nothing> isValid = maintenance::validation::machines(ids.get());
if (isValid.isError()) {
return BadRequest(isValid.error());
}
// Check that all machines are part of a maintenance schedule.
// TODO(josephw): Allow a transition from `UP` to `DOWN`.
foreach (const MachineID& id, ids.get()) {
if (!master->machines.contains(id)) {
return BadRequest(
"Machine '" + stringify(JSON::protobuf(id)) +
"' is not part of a maintenance schedule");
}
if (master->machines[id].info.mode() != MachineInfo::DRAINING) {
return BadRequest(
"Machine '" + stringify(JSON::protobuf(id)) +
"' is not in DRAINING mode and cannot be brought down");
}
}
return master->registrar->apply(Owned<Operation>(
new maintenance::StartMaintenance(ids.get())))
.then(defer(master->self(), [=](bool result) -> Future<Response> {
// See the top comment in "master/maintenance.hpp" for why this check
// is here, and is appropriate.
CHECK(result);
// We currently send a `ShutdownMessage` to each slave. This terminates
// all the executors for all the frameworks running on that slave.
// We also manually remove the slave to force sending TASK_LOST updates
// for all the tasks that were running on the slave and `LostSlaveMessage`
// messages to the framework. This guards against the slave having dropped
// the `ShutdownMessage`.
foreach (const MachineID& machineId, ids.get()) {
// The machine may not be in machines. This means no slaves are
// currently registered on that machine so this is a no-op.
if (master->machines.contains(machineId)) {
// NOTE: Copies are needed because removeSlave modifies
// master->machines.
foreach (
const SlaveID& slaveId,
utils::copy(master->machines[machineId].slaves)) {
Slave* slave = master->slaves.registered.get(slaveId);
CHECK_NOTNULL(slave);
// Tell the slave to shut down.
ShutdownMessage shutdownMessage;
shutdownMessage.set_message("Operator initiated 'Machine DOWN'");
master->send(slave->pid, shutdownMessage);
// Immediately remove the slave to force sending `TASK_LOST` status
// updates as well as `LostSlaveMessage` messages to the frameworks.
// See comment above.
master->removeSlave(slave, "Operator initiated 'Machine DOWN'");
}
}
}
// Update the master's local state with the downed machines.
foreach (const MachineID& id, ids.get()) {
master->machines[id].info.set_mode(MachineInfo::DOWN);
}
return OK();
}));
}
// /master/maintenance/start endpoint help.
string Master::Http::MACHINE_UP_HELP()
{
return HELP(
TLDR(
"Brings a set of machines back up."),
DESCRIPTION(
"POST: Validates the request body as JSON and transitions",
" the list of machines into UP mode. This also removes",
" the list of machines from the maintenance schedule."));
}
// /master/machine/up endpoint handler.
Future<Response> Master::Http::machineUp(const Request& request) const
{
if (request.method != "POST") {
return MethodNotAllowed(
{"POST"}, "Expecting 'POST', received '" + request.method + "'");
}
// Parse the POST body as JSON.
Try<JSON::Array> jsonIds = JSON::parse<JSON::Array>(request.body);
if (jsonIds.isError()) {
return BadRequest(jsonIds.error());
}
// Convert the machines to a protobuf.
auto ids = ::protobuf::parse<RepeatedPtrField<MachineID>>(jsonIds.get());
if (ids.isError()) {
return BadRequest(ids.error());
}
// Validate every machine in the list.
Try<Nothing> isValid = maintenance::validation::machines(ids.get());
if (isValid.isError()) {
return BadRequest(isValid.error());
}
// Check that all machines are part of a maintenance schedule.
foreach (const MachineID& id, ids.get()) {
if (!master->machines.contains(id)) {
return BadRequest(
"Machine '" + stringify(JSON::protobuf(id)) +
"' is not part of a maintenance schedule");
}
if (master->machines[id].info.mode() != MachineInfo::DOWN) {
return BadRequest(
"Machine '" + stringify(JSON::protobuf(id)) +
"' is not in DOWN mode and cannot be brought up");
}
}
return master->registrar->apply(Owned<Operation>(
new maintenance::StopMaintenance(ids.get())))
.then(defer(master->self(), [=](bool result) -> Future<Response> {
// See the top comment in "master/maintenance.hpp" for why this check
// is here, and is appropriate.
CHECK(result);
// Update the master's local state with the reactivated machines.
hashset<MachineID> updated;
foreach (const MachineID& id, ids.get()) {
master->machines[id].info.set_mode(MachineInfo::UP);
master->machines[id].info.clear_unavailability();
updated.insert(id);
}
// Delete the machines from the schedule.
for (list<mesos::maintenance::Schedule>::iterator schedule =
master->maintenance.schedules.begin();
schedule != master->maintenance.schedules.end();) {
for (int j = schedule->windows().size() - 1; j >= 0; j--) {
mesos::maintenance::Window* window = schedule->mutable_windows(j);
// Delete individual machines.
for (int k = window->machine_ids().size() - 1; k >= 0; k--) {
if (updated.contains(window->machine_ids(k))) {
window->mutable_machine_ids()->DeleteSubrange(k, 1);
}
}
// If the resulting window is empty, delete it.
if (window->machine_ids().size() == 0) {
schedule->mutable_windows()->DeleteSubrange(j, 1);
}
}
// If the resulting schedule is empty, delete it.
if (schedule->windows().size() == 0) {
schedule = master->maintenance.schedules.erase(schedule);
} else {
++schedule;
}
}
return OK();
}));
}
// /master/maintenance/status endpoint help.
string Master::Http::MAINTENANCE_STATUS_HELP()
{
return HELP(
TLDR(
"Retrieves the maintenance status of the cluster."),
DESCRIPTION(
"Returns an object with one list of machines per machine mode.",
"For draining machines, this list includes the frameworks' responses",
"to inverse offers. NOTE: Inverse offer responses are cleared if",
"the master fails over. However, new inverse offers will be sent",
"once the master recovers."));
}
// /master/maintenance/status endpoint handler.
Future<Response> Master::Http::maintenanceStatus(const Request& request) const
{
if (request.method != "GET") {
return MethodNotAllowed(
{"GET"}, "Expecting 'GET', received '" + request.method + "'");
}
return master->allocator->getInverseOfferStatuses()
.then(defer(
master->self(),
[=](
hashmap<
SlaveID,
hashmap<FrameworkID, mesos::master::InverseOfferStatus>> result)
-> Future<Response> {
// Unwrap the master's machine information into two arrays of machines.
// The data is coming from the allocator and therefore could be stale.
// Also, if the master fails over, this data is cleared.
mesos::maintenance::ClusterStatus status;
foreachpair (
const MachineID& id,
const Machine& machine,
master->machines) {
switch (machine.info.mode()) {
case MachineInfo::DRAINING: {
mesos::maintenance::ClusterStatus::DrainingMachine* drainingMachine =
status.add_draining_machines();
drainingMachine->mutable_id()->CopyFrom(id);
// Unwrap inverse offer status information from the allocator.
foreach (const SlaveID& slave, machine.slaves) {
if (result.contains(slave)) {
foreachvalue (
const mesos::master::InverseOfferStatus& status,
result[slave]) {
drainingMachine->add_statuses()->CopyFrom(status);
}
}
}
break;
}
case MachineInfo::DOWN: {
status.add_down_machines()->CopyFrom(id);
break;
}
// Currently, `UP` machines are not specifically tracked in the master.
case MachineInfo::UP: {}
default: {
break;
}
}
}
return OK(JSON::protobuf(status), request.url.query.get("jsonp"));
}));
}
string Master::Http::UNRESERVE_HELP()
{
return HELP(
TLDR(
"Unreserve resources dynamically on a specific slave."),
DESCRIPTION(
"Returns 200 OK if the request was accepted. This does not",
"imply that the requested resources have been unreserved successfully:",
"resource unreservation is done asynchronously and may fail.",
"",
"Please provide \"slaveId\" and \"resources\" values designating",
"the resources to be unreserved."));
}
Future<Response> Master::Http::unreserve(
const Request& request,
const Option<string>& principal) const
{
if (request.method != "POST") {
return MethodNotAllowed(
{"POST"}, "Expecting 'POST', received '" + request.method + "'");
}
// Parse the query string in the request body.
Try<hashmap<string, string>> decode =
process::http::query::decode(request.body);
if (decode.isError()) {
return BadRequest("Unable to decode query string: " + decode.error());
}
const hashmap<string, string>& values = decode.get();
if (values.get("slaveId").isNone()) {
return BadRequest("Missing 'slaveId' query parameter");
}
SlaveID slaveId;
slaveId.set_value(values.get("slaveId").get());
Slave* slave = master->slaves.registered.get(slaveId);
if (slave == NULL) {
return BadRequest("No slave found with specified ID");
}
if (values.get("resources").isNone()) {
return BadRequest("Missing 'resources' query parameter");
}
Try<JSON::Array> parse =
JSON::parse<JSON::Array>(values.get("resources").get());
if (parse.isError()) {
return BadRequest(
"Error in parsing 'resources' query parameter: " + parse.error());
}
Resources resources;
foreach (const JSON::Value& value, parse.get().values) {
Try<Resource> resource = ::protobuf::parse<Resource>(value);
if (resource.isError()) {
return BadRequest(
"Error in parsing 'resources' query parameter: " + resource.error());
}
resources += resource.get();
}
// Create an offer operation.
Offer::Operation operation;
operation.set_type(Offer::Operation::UNRESERVE);
operation.mutable_unreserve()->mutable_resources()->CopyFrom(resources);
Option<Error> error = validation::operation::validate(operation.unreserve());
if (error.isSome()) {
return BadRequest(
"Invalid UNRESERVE operation: " + error.get().message);
}
return master->authorizeUnreserveResources(operation.unreserve(), principal)
.then(defer(master->self(), [=](bool authorized) -> Future<Response> {
if (!authorized) {
return Forbidden();
}
return _operation(slaveId, resources, operation);
}));
}
Future<Response> Master::Http::_operation(
const SlaveID& slaveId,
Resources required,
const Offer::Operation& operation) const
{
Slave* slave = master->slaves.registered.get(slaveId);
if (slave == NULL) {
return BadRequest("No slave found with specified ID");
}
// The resources recovered by rescinding outstanding offers.
Resources recovered;
// We pessimistically assume that what seems like "available"
// resources in the allocator will be gone. This can happen due to
// the race between the allocator scheduling an 'allocate' call to
// itself vs master's request to schedule 'updateAvailable'.
// We greedily rescind one offer at time until we've rescinded
// enough offers to cover 'operation'.
foreach (Offer* offer, utils::copy(slave->offers)) {
// If rescinding the offer would not contribute to satisfying
// the required resources, skip it.
if (required == required - offer->resources()) {
continue;
}
recovered += offer->resources();
required -= offer->resources();
// We explicitly pass 'Filters()' which has a default 'refuse_sec'
// of 5 seconds rather than 'None()' here, so that we can
// virtually always win the race against 'allocate'.
master->allocator->recoverResources(
offer->framework_id(),
offer->slave_id(),
offer->resources(),
Filters());
master->removeOffer(offer, true); // Rescind!
// If we've rescinded enough offers to cover 'operation', we're done.
Try<Resources> updatedRecovered = recovered.apply(operation);
if (updatedRecovered.isSome()) {
break;
}
}
// Propagate the 'Future<Nothing>' as 'Future<Response>' where
// 'Nothing' -> 'OK' and Failed -> 'Conflict'.
return master->apply(slave, operation)
.then([]() -> Response { return OK(); })
.repair([](const Future<Response>& result) {
return Conflict(result.failure());
});
}
} // namespace master {
} // namespace internal {
} // namespace mesos {