| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <iomanip> |
| #include <map> |
| #include <memory> |
| #include <sstream> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include <boost/array.hpp> |
| |
| #include <mesos/type_utils.hpp> |
| |
| #include <mesos/authorizer/authorizer.hpp> |
| |
| #include <process/help.hpp> |
| |
| #include <process/metrics/metrics.hpp> |
| |
| #include <stout/base64.hpp> |
| #include <stout/foreach.hpp> |
| #include <stout/json.hpp> |
| #include <stout/lambda.hpp> |
| #include <stout/net.hpp> |
| #include <stout/numify.hpp> |
| #include <stout/os.hpp> |
| #include <stout/result.hpp> |
| #include <stout/strings.hpp> |
| |
| #include "common/attributes.hpp" |
| #include "common/build.hpp" |
| #include "common/http.hpp" |
| #include "common/protobuf_utils.hpp" |
| |
| #include "internal/devolve.hpp" |
| |
| #include "logging/logging.hpp" |
| |
| #include "master/master.hpp" |
| #include "master/validation.hpp" |
| |
| #include "mesos/mesos.hpp" |
| #include "mesos/resources.hpp" |
| |
| using process::Clock; |
| using process::DESCRIPTION; |
| using process::Future; |
| using process::HELP; |
| using process::TLDR; |
| using process::USAGE; |
| |
| using process::http::Accepted; |
| using process::http::BadRequest; |
| using process::http::Forbidden; |
| using process::http::InternalServerError; |
| using process::http::MethodNotAllowed; |
| using process::http::NotFound; |
| using process::http::NotImplemented; |
| using process::http::NotAcceptable; |
| using process::http::OK; |
| using process::http::Pipe; |
| using process::http::ServiceUnavailable; |
| using process::http::TemporaryRedirect; |
| using process::http::Unauthorized; |
| using process::http::UnsupportedMediaType; |
| |
| using process::metrics::internal::MetricsProcess; |
| |
| using std::map; |
| using std::string; |
| using std::vector; |
| |
| |
| namespace mesos { |
| namespace internal { |
| namespace master { |
| |
| // Pull in model overrides from common. |
| using mesos::internal::model; |
| |
| // Pull in definitions from process. |
| using process::http::Response; |
| using process::http::Request; |
| |
| |
| // TODO(bmahler): Kill these in favor of automatic Proto->JSON Conversion (when |
| // it becomes available). |
| |
| |
| // Returns a JSON object modeled on an Offer. |
| JSON::Object model(const Offer& offer) |
| { |
| JSON::Object object; |
| object.values["id"] = offer.id().value(); |
| object.values["framework_id"] = offer.framework_id().value(); |
| object.values["slave_id"] = offer.slave_id().value(); |
| object.values["resources"] = model(offer.resources()); |
| return object; |
| } |
| |
| |
| // Returns a JSON object summarizing some important fields in a |
| // Framework. |
| JSON::Object summarize(const Framework& framework) |
| { |
| JSON::Object object; |
| object.values["id"] = framework.id().value(); |
| object.values["name"] = framework.info.name(); |
| |
| // Omit pid for http frameworks. |
| if (framework.pid.isSome()) { |
| object.values["pid"] = string(framework.pid.get()); |
| } |
| |
| // TODO(bmahler): Use these in the webui. |
| object.values["used_resources"] = model(framework.totalUsedResources); |
| object.values["offered_resources"] = model(framework.totalOfferedResources); |
| |
| { |
| JSON::Array array; |
| array.values.reserve(framework.info.capabilities_size()); |
| foreach (const FrameworkInfo::Capability& capability, |
| framework.info.capabilities()) { |
| array.values.push_back( |
| FrameworkInfo::Capability::Type_Name(capability.type())); |
| } |
| object.values["capabilities"] = std::move(array); |
| } |
| |
| object.values["hostname"] = framework.info.hostname(); |
| object.values["webui_url"] = framework.info.webui_url(); |
| |
| return object; |
| } |
| |
| |
| // Returns a JSON object modeled on a Framework. |
| JSON::Object model(const Framework& framework) |
| { |
| JSON::Object object = summarize(framework); |
| |
| // Add additional fields to those generated by 'summarize'. |
| object.values["user"] = framework.info.user(); |
| object.values["failover_timeout"] = framework.info.failover_timeout(); |
| object.values["checkpoint"] = framework.info.checkpoint(); |
| object.values["role"] = framework.info.role(); |
| object.values["registered_time"] = framework.registeredTime.secs(); |
| object.values["unregistered_time"] = framework.unregisteredTime.secs(); |
| object.values["active"] = framework.active; |
| |
| // TODO(bmahler): Consider deprecating this in favor of the split |
| // used and offered resources added in 'summarize'. |
| object.values["resources"] = |
| model(framework.totalUsedResources + framework.totalOfferedResources); |
| |
| // TODO(benh): Consider making reregisteredTime an Option. |
| if (framework.registeredTime != framework.reregisteredTime) { |
| object.values["reregistered_time"] = framework.reregisteredTime.secs(); |
| } |
| |
| // Model all of the tasks associated with a framework. |
| { |
| JSON::Array array; |
| array.values.reserve( |
| framework.pendingTasks.size() + framework.tasks.size()); // MESOS-2353. |
| |
| foreachvalue (const TaskInfo& task, framework.pendingTasks) { |
| vector<TaskStatus> statuses; |
| array.values.push_back( |
| model(task, framework.id(), TASK_STAGING, statuses)); |
| } |
| |
| foreachvalue (Task* task, framework.tasks) { |
| array.values.push_back(model(*task)); |
| } |
| |
| object.values["tasks"] = std::move(array); |
| } |
| |
| // Model all of the completed tasks of a framework. |
| { |
| JSON::Array array; |
| array.values.reserve(framework.completedTasks.size()); // MESOS-2353. |
| |
| foreach (const std::shared_ptr<Task>& task, framework.completedTasks) { |
| array.values.push_back(model(*task)); |
| } |
| |
| object.values["completed_tasks"] = std::move(array); |
| } |
| |
| // Model all of the offers associated with a framework. |
| { |
| JSON::Array array; |
| array.values.reserve(framework.offers.size()); // MESOS-2353. |
| |
| foreach (Offer* offer, framework.offers) { |
| array.values.push_back(model(*offer)); |
| } |
| |
| object.values["offers"] = std::move(array); |
| } |
| |
| // Model all of the executors of a framework. |
| { |
| JSON::Array executors; |
| int executorSize = 0; |
| foreachvalue (const auto& executorsMap, |
| framework.executors) { |
| executorSize += executorsMap.size(); |
| } |
| executors.values.reserve(executorSize); // MESOS-2353 |
| foreachpair (const SlaveID& slaveId, |
| const auto& executorsMap, |
| framework.executors) { |
| foreachvalue (const ExecutorInfo& executor, executorsMap) { |
| JSON::Object executorJson = model(executor); |
| executorJson.values["slave_id"] = slaveId.value(); |
| executors.values.push_back(executorJson); |
| } |
| } |
| |
| object.values["executors"] = std::move(executors); |
| } |
| |
| // Model all of the labels associated with a framework. |
| if (framework.info.has_labels()) { |
| JSON::Array array; |
| const mesos::Labels labels = framework.info.labels(); |
| array.values.reserve(labels.labels_size()); |
| |
| foreach (const Label& label, labels.labels()) { |
| array.values.push_back(JSON::Protobuf(label)); |
| } |
| object.values["labels"] = std::move(array); |
| } |
| |
| return object; |
| } |
| |
| |
| // Returns a JSON object summarizing some important fields in a Slave. |
| JSON::Object summarize(const Slave& slave) |
| { |
| JSON::Object object; |
| object.values["id"] = slave.id.value(); |
| object.values["pid"] = string(slave.pid); |
| object.values["hostname"] = slave.info.hostname(); |
| object.values["registered_time"] = slave.registeredTime.secs(); |
| |
| if (slave.reregisteredTime.isSome()) { |
| object.values["reregistered_time"] = slave.reregisteredTime.get().secs(); |
| } |
| |
| const Resources& totalResources = slave.totalResources; |
| object.values["resources"] = model(totalResources); |
| object.values["used_resources"] = model(Resources::sum(slave.usedResources)); |
| object.values["offered_resources"] = model(slave.offeredResources); |
| object.values["reserved_resources"] = model(totalResources.reserved()); |
| object.values["unreserved_resources"] = model(totalResources.unreserved()); |
| |
| object.values["attributes"] = model(slave.info.attributes()); |
| object.values["active"] = slave.active; |
| return object; |
| } |
| |
| |
| // Returns a JSON object modeled after a Slave. |
| // For now there are no additional fields being added to those |
| // generated by 'summarize'. |
| JSON::Object model(const Slave& slave) |
| { |
| return summarize(slave); |
| } |
| |
| |
| // Returns a JSON object modeled after a Role. |
| JSON::Object model(const Role& role) |
| { |
| JSON::Object object; |
| object.values["name"] = role.info.name(); |
| object.values["weight"] = role.info.weight(); |
| object.values["resources"] = model(role.resources()); |
| |
| { |
| JSON::Array array; |
| |
| foreachkey (const FrameworkID& frameworkId, role.frameworks) { |
| array.values.push_back(frameworkId.value()); |
| } |
| |
| object.values["frameworks"] = std::move(array); |
| } |
| |
| return object; |
| } |
| |
| |
| void Master::Http::log(const Request& request) |
| { |
| Option<string> userAgent = request.headers.get("User-Agent"); |
| Option<string> forwardedFor = request.headers.get("X-Forwarded-For"); |
| |
| LOG(INFO) << "HTTP " << request.method << " for " << request.path |
| << " from " << request.client |
| << (userAgent.isSome() |
| ? " with User-Agent='" + userAgent.get() + "'" |
| : "") |
| << (forwardedFor.isSome() |
| ? " with X-Forwarded-For='" + forwardedFor.get() + "'" |
| : ""); |
| } |
| |
| |
| // TODO(ijimenez): Add some information or pointers to help |
| // users understand the HTTP Event/Call API. |
| const string Master::Http::SCHEDULER_HELP = HELP( |
| TLDR( |
| "Endpoint for schedulers to make Calls against the master."), |
| USAGE( |
| "/api/v1/scheduler"), |
| DESCRIPTION( |
| "Returns 202 Accepted iff the request is accepted.")); |
| |
| |
| Future<Response> Master::Http::scheduler(const Request& request) const |
| { |
| // TODO(vinod): Add metrics for rejected requests. |
| |
| // TODO(vinod): Add support for rate limiting. |
| |
| if (!master->elected()) { |
| // Note that this could happen if the scheduler realizes this is the |
| // leading master before master itself realizes it (e.g., ZK watch delay). |
| return ServiceUnavailable("Not the leading master"); |
| } |
| |
| CHECK_SOME(master->recovered); |
| |
| if (!master->recovered.get().isReady()) { |
| return ServiceUnavailable("Master has not finished recovery"); |
| } |
| |
| if (master->flags.authenticate_frameworks) { |
| return Unauthorized( |
| "Mesos master", |
| "HTTP schedulers are not supported when authentication is required"); |
| } |
| |
| if (request.method != "POST") { |
| return MethodNotAllowed( |
| "Expecting a 'POST' request, received '" + request.method + "'"); |
| } |
| |
| v1::scheduler::Call v1Call; |
| |
| // TODO(anand): Content type values are case-insensitive. |
| Option<string> contentType = request.headers.get("Content-Type"); |
| |
| if (contentType.isNone()) { |
| return BadRequest("Expecting 'Content-Type' to be present"); |
| } |
| |
| if (contentType.get() == APPLICATION_PROTOBUF) { |
| if (!v1Call.ParseFromString(request.body)) { |
| return BadRequest("Failed to parse body into Call protobuf"); |
| } |
| } else if (contentType.get() == APPLICATION_JSON) { |
| Try<JSON::Value> value = JSON::parse(request.body); |
| |
| if (value.isError()) { |
| return BadRequest("Failed to parse body into JSON: " + value.error()); |
| } |
| |
| Try<v1::scheduler::Call> parse = |
| ::protobuf::parse<v1::scheduler::Call>(value.get()); |
| |
| if (parse.isError()) { |
| return BadRequest("Failed to convert JSON into Call protobuf: " + |
| parse.error()); |
| } |
| |
| v1Call = parse.get(); |
| } else { |
| return UnsupportedMediaType( |
| string("Expecting 'Content-Type' of ") + |
| APPLICATION_JSON + " or " + APPLICATION_PROTOBUF); |
| } |
| |
| scheduler::Call call = devolve(v1Call); |
| |
| Option<Error> error = validation::scheduler::call::validate(call); |
| |
| if (error.isSome()) { |
| return BadRequest("Failed to validate Scheduler::Call: " + |
| error.get().message); |
| } |
| |
| if (call.type() == scheduler::Call::SUBSCRIBE) { |
| // We default to JSON since an empty 'Accept' header |
| // results in all media types considered acceptable. |
| ContentType responseContentType; |
| |
| if (request.acceptsMediaType(APPLICATION_JSON)) { |
| responseContentType = ContentType::JSON; |
| } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) { |
| responseContentType = ContentType::PROTOBUF; |
| } else { |
| return NotAcceptable( |
| string("Expecting 'Accept' to allow ") + |
| "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'"); |
| } |
| |
| Pipe pipe; |
| OK ok; |
| ok.headers["Content-Type"] = stringify(responseContentType); |
| |
| ok.type = Response::PIPE; |
| ok.reader = pipe.reader(); |
| |
| HttpConnection http {pipe.writer(), responseContentType}; |
| master->subscribe(http, call.subscribe()); |
| |
| return ok; |
| } |
| |
| // We consolidate the framework lookup logic here because it is |
| // common for all the call handlers. |
| Framework* framework = master->getFramework(call.framework_id()); |
| |
| if (framework == NULL) { |
| return BadRequest("Framework cannot be found"); |
| } |
| |
| if (!framework->connected) { |
| return Forbidden("Framework is not subscribed"); |
| } |
| |
| switch (call.type()) { |
| case scheduler::Call::TEARDOWN: |
| master->removeFramework(framework); |
| return Accepted(); |
| |
| case scheduler::Call::ACCEPT: |
| master->accept(framework, call.accept()); |
| return Accepted(); |
| |
| case scheduler::Call::DECLINE: |
| master->decline(framework, call.decline()); |
| return Accepted(); |
| |
| case scheduler::Call::REVIVE: |
| master->revive(framework); |
| return Accepted(); |
| |
| case scheduler::Call::KILL: |
| master->kill(framework, call.kill()); |
| return Accepted(); |
| |
| case scheduler::Call::SHUTDOWN: |
| master->shutdown(framework, call.shutdown()); |
| return Accepted(); |
| |
| case scheduler::Call::ACKNOWLEDGE: |
| master->acknowledge(framework, call.acknowledge()); |
| return Accepted(); |
| |
| case scheduler::Call::RECONCILE: |
| master->reconcile(framework, call.reconcile()); |
| return Accepted(); |
| |
| case scheduler::Call::MESSAGE: |
| master->message(framework, call.message()); |
| return Accepted(); |
| |
| case scheduler::Call::REQUEST: |
| master->request(framework, call.request()); |
| return Accepted(); |
| |
| default: |
| // Should be caught during call validation above. |
| LOG(FATAL) << "Unexpected " << call.type() << " call"; |
| } |
| |
| return NotImplemented(); |
| } |
| |
| |
| const string Master::Http::HEALTH_HELP = HELP( |
| TLDR( |
| "Health check of the Master."), |
| USAGE( |
| "/master/health"), |
| DESCRIPTION( |
| "Returns 200 OK iff the Master is healthy.", |
| "Delayed responses are also indicative of poor health.")); |
| |
| |
| Future<Response> Master::Http::health(const Request& request) const |
| { |
| return OK(); |
| } |
| |
| const static string HOSTS_KEY = "hosts"; |
| const static string LEVEL_KEY = "level"; |
| const static string MONITOR_KEY = "monitor"; |
| |
| const string Master::Http::OBSERVE_HELP = HELP( |
| TLDR( |
| "Observe a monitor health state for host(s)."), |
| USAGE( |
| "/master/observe"), |
| DESCRIPTION( |
| "This endpoint receives information indicating host(s) ", |
| "health." |
| "", |
| "The following fields should be supplied in a POST:", |
| "1. " + MONITOR_KEY + " - name of the monitor that is being reported", |
| "2. " + HOSTS_KEY + " - comma separated list of hosts", |
| "3. " + LEVEL_KEY + " - OK for healthy, anything else for unhealthy")); |
| |
| |
| Try<string> getFormValue( |
| const string& key, |
| const hashmap<string, string>& values) |
| { |
| Option<string> value = values.get(key); |
| |
| if (value.isNone()) { |
| return Error("Missing value for '" + key + "'."); |
| } |
| |
| // HTTP decode the value. |
| Try<string> decodedValue = process::http::decode(value.get()); |
| if (decodedValue.isError()) { |
| return decodedValue; |
| } |
| |
| // Treat empty string as an error. |
| if (decodedValue.isSome() && decodedValue.get().empty()) { |
| return Error("Empty string for '" + key + "'."); |
| } |
| |
| return decodedValue.get(); |
| } |
| |
| |
| Future<Response> Master::Http::observe(const Request& request) const |
| { |
| Try<hashmap<string, string>> decode = |
| process::http::query::decode(request.body); |
| |
| if (decode.isError()) { |
| return BadRequest("Unable to decode query string: " + decode.error()); |
| } |
| |
| hashmap<string, string> values = decode.get(); |
| |
| // Build up a JSON object of the values we recieved and send them back |
| // down the wire as JSON for validation / confirmation. |
| JSON::Object response; |
| |
| // TODO(ccarson): As soon as RepairCoordinator is introduced it will |
| // consume these values. We should revisit if we still want to send the |
| // JSON down the wire at that point. |
| |
| // Add 'monitor'. |
| Try<string> monitor = getFormValue(MONITOR_KEY, values); |
| if (monitor.isError()) { |
| return BadRequest(monitor.error()); |
| } |
| response.values[MONITOR_KEY] = monitor.get(); |
| |
| // Add 'hosts'. |
| Try<string> hostsString = getFormValue(HOSTS_KEY, values); |
| if (hostsString.isError()) { |
| return BadRequest(hostsString.error()); |
| } |
| |
| vector<string> hosts = strings::split(hostsString.get(), ","); |
| JSON::Array hostArray; |
| hostArray.values.assign(hosts.begin(), hosts.end()); |
| |
| response.values[HOSTS_KEY] = hostArray; |
| |
| // Add 'isHealthy'. |
| Try<string> level = getFormValue(LEVEL_KEY, values); |
| if (level.isError()) { |
| return BadRequest(level.error()); |
| } |
| |
| bool isHealthy = strings::upper(level.get()) == "OK"; |
| |
| response.values["isHealthy"] = isHealthy; |
| |
| return OK(response); |
| } |
| |
| |
| const string Master::Http::REDIRECT_HELP = HELP( |
| TLDR( |
| "Redirects to the leading Master."), |
| USAGE( |
| "/master/redirect"), |
| DESCRIPTION( |
| "This returns a 307 Temporary Redirect to the leading Master.", |
| "If no Master is leading (according to this Master), then the", |
| "Master will redirect to itself.", |
| "", |
| "**NOTES:**", |
| "1. This is the recommended way to bookmark the WebUI when", |
| "running multiple Masters.", |
| "2. This is broken currently \"on the cloud\" (e.g. EC2) as", |
| "this will attempt to redirect to the private IP address, unless", |
| "advertise_ip points to an externally accessible IP")); |
| |
| |
| Future<Response> Master::Http::redirect(const Request& request) const |
| { |
| // If there's no leader, redirect to this master's base url. |
| MasterInfo info = master->leader.isSome() |
| ? master->leader.get() |
| : master->info_; |
| |
| // NOTE: Currently, 'info.ip()' stores ip in network order, which |
| // should be fixed. See MESOS-1201 for details. |
| Try<string> hostname = info.has_hostname() |
| ? info.hostname() |
| : net::getHostname(net::IP(ntohl(info.ip()))); |
| |
| if (hostname.isError()) { |
| return InternalServerError(hostname.error()); |
| } |
| |
| // NOTE: We can use a protocol-relative URL here in order to allow |
| // the browser (or other HTTP client) to prefix with 'http:' or |
| // 'https:' depending on the original request. See |
| // https://tools.ietf.org/html/rfc7231#section-7.1.2 as well as |
| // http://stackoverflow.com/questions/12436669/using-protocol-relative-uris-within-location-headers |
| // which discusses this. |
| return TemporaryRedirect( |
| "//" + hostname.get() + ":" + stringify(info.port())); |
| } |
| |
| |
| const string Master::Http::SLAVES_HELP = HELP( |
| TLDR( |
| "Information about registered slaves."), |
| USAGE( |
| "/master/slaves"), |
| DESCRIPTION( |
| "This endpoint shows information about the slaves registered in", |
| "this master formatted as a JSON object.")); |
| |
| |
| Future<Response> Master::Http::slaves(const Request& request) const |
| { |
| JSON::Object object; |
| |
| { |
| JSON::Array array; |
| array.values.reserve(master->slaves.registered.size()); // MESOS-2353. |
| |
| foreachvalue (const Slave* slave, master->slaves.registered) { |
| array.values.push_back(model(*slave)); |
| } |
| |
| object.values["slaves"] = std::move(array); |
| } |
| |
| |
| return OK(object, request.query.get("jsonp")); |
| } |
| |
| |
| const string Master::Http::STATE_HELP = HELP( |
| TLDR( |
| "Information about state of master."), |
| USAGE( |
| "/master/state.json"), |
| DESCRIPTION( |
| "This endpoint shows information about the frameworks, tasks,", |
| "executors and slaves running in the cluster as a JSON object.")); |
| |
| |
| Future<Response> Master::Http::state(const Request& request) const |
| { |
| JSON::Object object; |
| object.values["version"] = MESOS_VERSION; |
| |
| if (build::GIT_SHA.isSome()) { |
| object.values["git_sha"] = build::GIT_SHA.get(); |
| } |
| |
| if (build::GIT_BRANCH.isSome()) { |
| object.values["git_branch"] = build::GIT_BRANCH.get(); |
| } |
| |
| if (build::GIT_TAG.isSome()) { |
| object.values["git_tag"] = build::GIT_TAG.get(); |
| } |
| |
| object.values["build_date"] = build::DATE; |
| object.values["build_time"] = build::TIME; |
| object.values["build_user"] = build::USER; |
| object.values["start_time"] = master->startTime.secs(); |
| |
| if (master->electedTime.isSome()) { |
| object.values["elected_time"] = master->electedTime.get().secs(); |
| } |
| |
| object.values["id"] = master->info().id(); |
| object.values["pid"] = string(master->self()); |
| object.values["hostname"] = master->info().hostname(); |
| object.values["activated_slaves"] = master->_slaves_active(); |
| object.values["deactivated_slaves"] = master->_slaves_inactive(); |
| |
| if (master->flags.cluster.isSome()) { |
| object.values["cluster"] = master->flags.cluster.get(); |
| } |
| |
| if (master->leader.isSome()) { |
| object.values["leader"] = master->leader.get().pid(); |
| } |
| |
| if (master->flags.log_dir.isSome()) { |
| object.values["log_dir"] = master->flags.log_dir.get(); |
| } |
| |
| if (master->flags.external_log_file.isSome()) { |
| object.values["external_log_file"] = master->flags.external_log_file.get(); |
| } |
| |
| { |
| JSON::Object flags; |
| foreachpair (const string& name, const flags::Flag& flag, master->flags) { |
| Option<string> value = flag.stringify(master->flags); |
| if (value.isSome()) { |
| flags.values[name] = value.get(); |
| } |
| } |
| object.values["flags"] = std::move(flags); |
| } |
| |
| // Model all of the slaves. |
| { |
| JSON::Array array; |
| array.values.reserve(master->slaves.registered.size()); // MESOS-2353. |
| |
| foreachvalue (Slave* slave, master->slaves.registered) { |
| array.values.push_back(model(*slave)); |
| } |
| |
| object.values["slaves"] = std::move(array); |
| } |
| |
| // Model all of the frameworks. |
| { |
| JSON::Array array; |
| array.values.reserve(master->frameworks.registered.size()); // MESOS-2353. |
| |
| foreachvalue (Framework* framework, master->frameworks.registered) { |
| array.values.push_back(model(*framework)); |
| } |
| |
| object.values["frameworks"] = std::move(array); |
| } |
| |
| // Model all of the completed frameworks. |
| { |
| JSON::Array array; |
| array.values.reserve(master->frameworks.completed.size()); // MESOS-2353. |
| |
| foreach (const std::shared_ptr<Framework>& framework, |
| master->frameworks.completed) { |
| array.values.push_back(model(*framework)); |
| } |
| |
| object.values["completed_frameworks"] = std::move(array); |
| } |
| |
| // Model all of the orphan tasks. |
| { |
| JSON::Array array; |
| |
| // Find those orphan tasks. |
| foreachvalue (const Slave* slave, master->slaves.registered) { |
| typedef hashmap<TaskID, Task*> TaskMap; |
| foreachvalue (const TaskMap& tasks, slave->tasks) { |
| foreachvalue (const Task* task, tasks) { |
| CHECK_NOTNULL(task); |
| if (!master->frameworks.registered.contains(task->framework_id())) { |
| array.values.push_back(model(*task)); |
| } |
| } |
| } |
| } |
| |
| object.values["orphan_tasks"] = std::move(array); |
| } |
| |
| // Model all currently unregistered frameworks. |
| // This could happen when the framework has yet to re-register |
| // after master failover. |
| { |
| JSON::Array array; |
| |
| // Find unregistered frameworks. |
| foreachvalue (const Slave* slave, master->slaves.registered) { |
| foreachkey (const FrameworkID& frameworkId, slave->tasks) { |
| if (!master->frameworks.registered.contains(frameworkId)) { |
| array.values.push_back(frameworkId.value()); |
| } |
| } |
| } |
| |
| object.values["unregistered_frameworks"] = std::move(array); |
| } |
| |
| return OK(object, request.query.get("jsonp")); |
| } |
| |
| |
| // This abstraction has no side-effects. It factors out computing the |
| // mapping from 'slaves' to 'frameworks' to answer the questions 'what |
| // frameworks are running on a given slave?' and 'what slaves are |
| // running the given framework?'. |
| class SlaveFrameworkMapping |
| { |
| public: |
| SlaveFrameworkMapping(const hashmap<FrameworkID, Framework*>& frameworks) |
| { |
| foreachpair (const FrameworkID& frameworkId, |
| const Framework* framework, |
| frameworks) { |
| foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) { |
| frameworksToSlaves[frameworkId].insert(taskInfo.slave_id()); |
| slavesToFrameworks[taskInfo.slave_id()].insert(frameworkId); |
| } |
| |
| foreachvalue (const Task* task, framework->tasks) { |
| frameworksToSlaves[frameworkId].insert(task->slave_id()); |
| slavesToFrameworks[task->slave_id()].insert(frameworkId); |
| } |
| |
| foreach (const std::shared_ptr<Task>& task, framework->completedTasks) { |
| frameworksToSlaves[frameworkId].insert(task->slave_id()); |
| slavesToFrameworks[task->slave_id()].insert(frameworkId); |
| } |
| } |
| } |
| |
| const hashset<FrameworkID>& frameworks(const SlaveID& slaveId) const |
| { |
| const auto iterator = slavesToFrameworks.find(slaveId); |
| return iterator != slavesToFrameworks.end() ? |
| iterator->second : hashset<FrameworkID>::EMPTY; |
| } |
| |
| const hashset<SlaveID>& slaves(const FrameworkID& frameworkId) const |
| { |
| const auto iterator = frameworksToSlaves.find(frameworkId); |
| return iterator != frameworksToSlaves.end() ? |
| iterator->second : hashset<SlaveID>::EMPTY; |
| } |
| |
| private: |
| hashmap<SlaveID, hashset<FrameworkID>> slavesToFrameworks; |
| hashmap<FrameworkID, hashset<SlaveID>> frameworksToSlaves; |
| }; |
| |
| |
| // This abstraction has no side-effects. It factors out the accounting |
| // for a 'TaskState' summary. We use this to summarize 'TaskState's |
| // for both frameworks as well as slaves. |
| struct TaskStateSummary |
| { |
| // TODO(jmlvanre): Possibly clean this up as per MESOS-2694. |
| const static TaskStateSummary EMPTY; |
| |
| TaskStateSummary() |
| : staging(0), |
| starting(0), |
| running(0), |
| finished(0), |
| killed(0), |
| failed(0), |
| lost(0), |
| error(0) {} |
| |
| // Account for the state of the given task. |
| void count(const Task& task) |
| { |
| switch (task.state()) { |
| case TASK_STAGING: { ++staging; break; } |
| case TASK_STARTING: { ++starting; break; } |
| case TASK_RUNNING: { ++running; break; } |
| case TASK_FINISHED: { ++finished; break; } |
| case TASK_KILLED: { ++killed; break; } |
| case TASK_FAILED: { ++failed; break; } |
| case TASK_LOST: { ++lost; break; } |
| case TASK_ERROR: { ++error; break; } |
| // No default case allows for a helpful compiler error if we |
| // introduce a new state. |
| } |
| } |
| |
| size_t staging; |
| size_t starting; |
| size_t running; |
| size_t finished; |
| size_t killed; |
| size_t failed; |
| size_t lost; |
| size_t error; |
| }; |
| |
| |
| const TaskStateSummary TaskStateSummary::EMPTY; |
| |
| |
| // This abstraction has no side-effects. It factors out computing the |
| // 'TaskState' summaries for frameworks and slaves. This answers the |
| // questions 'How many tasks are in each state for a given framework?' |
| // and 'How many tasks are in each state for a given slave?'. |
| class TaskStateSummaries |
| { |
| public: |
| TaskStateSummaries(const hashmap<FrameworkID, Framework*>& frameworks) |
| { |
| foreachpair (const FrameworkID& frameworkId, |
| const Framework* framework, |
| frameworks) { |
| foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) { |
| frameworkTaskSummaries[frameworkId].staging++; |
| slaveTaskSummaries[taskInfo.slave_id()].staging++; |
| } |
| |
| foreachvalue (const Task* task, framework->tasks) { |
| frameworkTaskSummaries[frameworkId].count(*task); |
| slaveTaskSummaries[task->slave_id()].count(*task); |
| } |
| |
| foreach (const std::shared_ptr<Task>& task, framework->completedTasks) { |
| frameworkTaskSummaries[frameworkId].count(*task); |
| slaveTaskSummaries[task->slave_id()].count(*task); |
| } |
| } |
| } |
| |
| const TaskStateSummary& framework(const FrameworkID& frameworkId) const |
| { |
| const auto iterator = frameworkTaskSummaries.find(frameworkId); |
| return iterator != frameworkTaskSummaries.end() ? |
| iterator->second : TaskStateSummary::EMPTY; |
| } |
| |
| const TaskStateSummary& slave(const SlaveID& slaveId) const |
| { |
| const auto iterator = slaveTaskSummaries.find(slaveId); |
| return iterator != slaveTaskSummaries.end() ? |
| iterator->second : TaskStateSummary::EMPTY; |
| } |
| private: |
| hashmap<FrameworkID, TaskStateSummary> frameworkTaskSummaries; |
| hashmap<SlaveID, TaskStateSummary> slaveTaskSummaries; |
| }; |
| |
| |
| const string Master::Http::STATESUMMARY_HELP = HELP( |
| TLDR( |
| "Summary of state of all tasks and registered frameworks in cluster."), |
| USAGE( |
| "/master/state-summary"), |
| DESCRIPTION( |
| "This endpoint gives a summary of the state of all tasks and", |
| "registered frameworks in the cluster as a JSON object.")); |
| |
| |
| Future<Response> Master::Http::stateSummary(const Request& request) const |
| { |
| JSON::Object object; |
| |
| object.values["hostname"] = master->info().hostname(); |
| |
| if (master->flags.cluster.isSome()) { |
| object.values["cluster"] = master->flags.cluster.get(); |
| } |
| |
| // We use the tasks in the 'Frameworks' struct to compute summaries |
| // for this endpoint. This is done 1) for consistency between the |
| // 'slaves' and 'frameworks' subsections below 2) because we want to |
| // provide summary information for frameworks that are currently |
| // registered 3) the frameworks keep a circular buffer of completed |
| // tasks that we can use to keep a limited view on the history of |
| // recent completed / failed tasks. |
| |
| // Generate mappings from 'slave' to 'framework' and reverse. |
| SlaveFrameworkMapping slaveFrameworkMapping(master->frameworks.registered); |
| |
| // Generate 'TaskState' summaries for all framework and slave ids. |
| TaskStateSummaries taskStateSummaries(master->frameworks.registered); |
| |
| // Model all of the slaves. |
| { |
| JSON::Array array; |
| array.values.reserve(master->slaves.registered.size()); // MESOS-2353. |
| |
| foreachvalue (Slave* slave, master->slaves.registered) { |
| JSON::Object json = summarize(*slave); |
| |
| // Add the 'TaskState' summary for this slave. |
| const TaskStateSummary& summary = taskStateSummaries.slave(slave->id); |
| |
| json.values["TASK_STAGING"] = summary.staging; |
| json.values["TASK_STARTING"] = summary.starting; |
| json.values["TASK_RUNNING"] = summary.running; |
| json.values["TASK_FINISHED"] = summary.finished; |
| json.values["TASK_KILLED"] = summary.killed; |
| json.values["TASK_FAILED"] = summary.failed; |
| json.values["TASK_LOST"] = summary.lost; |
| json.values["TASK_ERROR"] = summary.error; |
| |
| // Add the ids of all the frameworks running on this slave. |
| const hashset<FrameworkID>& frameworks = |
| slaveFrameworkMapping.frameworks(slave->id); |
| |
| JSON::Array frameworkIdArray; |
| frameworkIdArray.values.reserve(frameworks.size()); // MESOS-2353. |
| |
| foreach (const FrameworkID& frameworkId, frameworks) { |
| frameworkIdArray.values.push_back(frameworkId.value()); |
| } |
| |
| json.values["framework_ids"] = std::move(frameworkIdArray); |
| |
| array.values.push_back(std::move(json)); |
| } |
| |
| object.values["slaves"] = std::move(array); |
| } |
| |
| // Model all of the frameworks. |
| { |
| JSON::Array array; |
| array.values.reserve(master->frameworks.registered.size()); // MESOS-2353. |
| |
| foreachpair (const FrameworkID& frameworkId, |
| Framework* framework, |
| master->frameworks.registered) { |
| JSON::Object json = summarize(*framework); |
| |
| // Add the 'TaskState' summary for this framework. |
| const TaskStateSummary& summary = |
| taskStateSummaries.framework(frameworkId); |
| json.values["TASK_STAGING"] = summary.staging; |
| json.values["TASK_STARTING"] = summary.starting; |
| json.values["TASK_RUNNING"] = summary.running; |
| json.values["TASK_FINISHED"] = summary.finished; |
| json.values["TASK_KILLED"] = summary.killed; |
| json.values["TASK_FAILED"] = summary.failed; |
| json.values["TASK_LOST"] = summary.lost; |
| json.values["TASK_ERROR"] = summary.error; |
| |
| // Add the ids of all the slaves running this framework. |
| const hashset<SlaveID>& slaves = |
| slaveFrameworkMapping.slaves(frameworkId); |
| |
| JSON::Array slaveIdArray; |
| slaveIdArray.values.reserve(slaves.size()); // MESOS-2353. |
| |
| foreach (const SlaveID& slaveId, slaves) { |
| slaveIdArray.values.push_back(slaveId.value()); |
| } |
| |
| json.values["slave_ids"] = std::move(slaveIdArray); |
| |
| array.values.push_back(std::move(json)); |
| } |
| |
| object.values["frameworks"] = std::move(array); |
| } |
| |
| return OK(object, request.query.get("jsonp")); |
| } |
| |
| |
| const string Master::Http::ROLES_HELP = HELP( |
| TLDR( |
| "Information about roles that the master is configured with."), |
| USAGE( |
| "/master/roles.json"), |
| DESCRIPTION( |
| "This endpoint gives information about the roles that are assigned", |
| "to frameworks and resources as a JSON object.")); |
| |
| |
| Future<Response> Master::Http::roles(const Request& request) const |
| { |
| JSON::Object object; |
| |
| // Model all of the roles. |
| { |
| JSON::Array array; |
| foreachvalue (Role* role, master->roles) { |
| array.values.push_back(model(*role)); |
| } |
| |
| object.values["roles"] = std::move(array); |
| } |
| |
| return OK(object, request.query.get("jsonp")); |
| } |
| |
| |
| const string Master::Http::TEARDOWN_HELP = HELP( |
| TLDR( |
| "Tears down a running framework by shutting down all tasks/executors " |
| "and removing the framework."), |
| USAGE( |
| "/master/teardown"), |
| DESCRIPTION( |
| "Please provide a \"frameworkId\" value designating the running " |
| "framework to tear down.", |
| "Returns 200 OK if the framework was correctly teared down.")); |
| |
| |
| Future<Response> Master::Http::teardown(const Request& request) const |
| { |
| if (request.method != "POST") { |
| return BadRequest("Expecting POST"); |
| } |
| |
| // Parse the query string in the request body (since this is a POST) |
| // in order to determine the framework ID to shutdown. |
| Try<hashmap<string, string>> decode = |
| process::http::query::decode(request.body); |
| |
| if (decode.isError()) { |
| return BadRequest("Unable to decode query string: " + decode.error()); |
| } |
| |
| hashmap<string, string> values = decode.get(); |
| |
| if (values.get("frameworkId").isNone()) { |
| return BadRequest("Missing 'frameworkId' query parameter"); |
| } |
| |
| FrameworkID id; |
| id.set_value(values.get("frameworkId").get()); |
| |
| Framework* framework = master->getFramework(id); |
| |
| if (framework == NULL) { |
| return BadRequest("No framework found with specified ID"); |
| } |
| |
| Result<Credential> credential = authenticate(request); |
| |
| if (credential.isError()) { |
| return Unauthorized("Mesos master", credential.error()); |
| } |
| |
| // Skip authorization if no ACLs were provided to the master. |
| if (master->authorizer.isNone()) { |
| return _teardown(id); |
| } |
| |
| mesos::ACL::ShutdownFramework shutdown; |
| |
| if (credential.isSome()) { |
| shutdown.mutable_principals()->add_values(credential.get().principal()); |
| } else { |
| shutdown.mutable_principals()->set_type(ACL::Entity::ANY); |
| } |
| |
| if (framework->info.has_principal()) { |
| shutdown.mutable_framework_principals()->add_values( |
| framework->info.principal()); |
| } else { |
| shutdown.mutable_framework_principals()->set_type(ACL::Entity::ANY); |
| } |
| |
| lambda::function<Future<Response>(bool)> _teardown = |
| lambda::bind(&Master::Http::_teardown, this, id, lambda::_1); |
| |
| return master->authorizer.get()->authorize(shutdown) |
| .then(defer(master->self(), _teardown)); |
| } |
| |
| |
| Future<Response> Master::Http::_teardown( |
| const FrameworkID& id, |
| bool authorized) const |
| { |
| if (!authorized) { |
| return Unauthorized("Mesos master"); |
| } |
| |
| Framework* framework = master->getFramework(id); |
| |
| if (framework == NULL) { |
| return BadRequest("No framework found with ID " + stringify(id)); |
| } |
| |
| // TODO(ijimenez): Do 'removeFramework' asynchronously. |
| master->removeFramework(framework); |
| |
| return OK(); |
| } |
| |
| |
| const string Master::Http::TASKS_HELP = HELP( |
| TLDR( |
| "Lists tasks from all active frameworks."), |
| USAGE( |
| "/master/tasks.json"), |
| DESCRIPTION( |
| "Lists known tasks.", |
| "", |
| "Query parameters:", |
| "", |
| "> limit=VALUE Maximum number of tasks returned " |
| "(default is " + stringify(TASK_LIMIT) + ").", |
| "> offset=VALUE Starts task list at offset.", |
| "> order=(asc|desc) Ascending or descending sort order " |
| "(default is descending)." |
| "")); |
| |
| |
| struct TaskComparator |
| { |
| static bool ascending(const Task* lhs, const Task* rhs) |
| { |
| size_t lhsSize = lhs->statuses().size(); |
| size_t rhsSize = rhs->statuses().size(); |
| |
| if ((lhsSize == 0) && (rhsSize == 0)) { |
| return false; |
| } |
| |
| if (lhsSize == 0) { |
| return true; |
| } |
| |
| if (rhsSize == 0) { |
| return false; |
| } |
| |
| return (lhs->statuses(0).timestamp() < rhs->statuses(0).timestamp()); |
| } |
| |
| static bool descending(const Task* lhs, const Task* rhs) |
| { |
| size_t lhsSize = lhs->statuses().size(); |
| size_t rhsSize = rhs->statuses().size(); |
| |
| if ((lhsSize == 0) && (rhsSize == 0)) { |
| return false; |
| } |
| |
| if (rhsSize == 0) { |
| return true; |
| } |
| |
| if (lhsSize == 0) { |
| return false; |
| } |
| |
| return (lhs->statuses(0).timestamp() > rhs->statuses(0).timestamp()); |
| } |
| }; |
| |
| |
| Future<Response> Master::Http::tasks(const Request& request) const |
| { |
| // Get list options (limit and offset). |
| Result<int> result = numify<int>(request.query.get("limit")); |
| size_t limit = result.isSome() ? result.get() : TASK_LIMIT; |
| |
| result = numify<int>(request.query.get("offset")); |
| size_t offset = result.isSome() ? result.get() : 0; |
| |
| // TODO(nnielsen): Currently, formatting errors in offset and/or limit |
| // will silently be ignored. This could be reported to the user instead. |
| |
| // Construct framework list with both active and completed framwworks. |
| vector<const Framework*> frameworks; |
| foreachvalue (Framework* framework, master->frameworks.registered) { |
| frameworks.push_back(framework); |
| } |
| foreach (const std::shared_ptr<Framework>& framework, |
| master->frameworks.completed) { |
| frameworks.push_back(framework.get()); |
| } |
| |
| // Construct task list with both running and finished tasks. |
| vector<const Task*> tasks; |
| foreach (const Framework* framework, frameworks) { |
| foreachvalue (Task* task, framework->tasks) { |
| CHECK_NOTNULL(task); |
| tasks.push_back(task); |
| } |
| foreach (const std::shared_ptr<Task>& task, framework->completedTasks) { |
| tasks.push_back(task.get()); |
| } |
| } |
| |
| // Sort tasks by task status timestamp. Default order is descending. |
| // The earliest timestamp is chosen for comparison when multiple are present. |
| Option<string> order = request.query.get("order"); |
| if (order.isSome() && (order.get() == "asc")) { |
| sort(tasks.begin(), tasks.end(), TaskComparator::ascending); |
| } else { |
| sort(tasks.begin(), tasks.end(), TaskComparator::descending); |
| } |
| |
| JSON::Object object; |
| |
| { |
| JSON::Array array; |
| size_t end = std::min(offset + limit, tasks.size()); |
| for (size_t i = offset; i < end; i++) { |
| const Task* task = tasks[i]; |
| array.values.push_back(model(*task)); |
| } |
| |
| object.values["tasks"] = std::move(array); |
| } |
| |
| return OK(object, request.query.get("jsonp")); |
| } |
| |
| |
| Result<Credential> Master::Http::authenticate(const Request& request) const |
| { |
| // By default, assume everyone is authenticated if no credentials |
| // were provided. |
| if (master->credentials.isNone()) { |
| return None(); |
| } |
| |
| Option<string> authorization = request.headers.get("Authorization"); |
| |
| if (authorization.isNone()) { |
| return Error("Missing 'Authorization' request header"); |
| } |
| |
| Try<string> decode = |
| base64::decode(strings::split(authorization.get(), " ", 2)[1]); |
| |
| if (decode.isError()) { |
| return Error("Failed to decode 'Authorization' header: " + decode.error()); |
| } |
| |
| vector<string> pairs = strings::split(decode.get(), ":", 2); |
| |
| if (pairs.size() != 2) { |
| return Error("Malformed 'Authorization' request header"); |
| } |
| |
| const string& username = pairs[0]; |
| const string& password = pairs[1]; |
| |
| foreach (const Credential& credential, |
| master->credentials.get().credentials()) { |
| if (credential.principal() == username && |
| credential.secret() == password) { |
| return credential; |
| } |
| } |
| |
| return Error("Could not authenticate '" + username + "'"); |
| } |
| |
| |
| } // namespace master { |
| } // namespace internal { |
| } // namespace mesos { |