blob: 226fed440e0bdf4eb87cd5a4a9773105d62bea8a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <map>
#include <ostream>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include <mesos/attributes.hpp>
#include <mesos/http.hpp>
#include <mesos/resources.hpp>
#include <mesos/authentication/http/basic_authenticator_factory.hpp>
#include <mesos/authentication/http/combined_authenticator.hpp>
#include <mesos/authorizer/authorizer.hpp>
#include <mesos/module/http_authenticator.hpp>
#include <mesos/quota/quota.hpp>
#include <process/authenticator.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/http.hpp>
#include <process/pid.hpp>
#include <stout/duration.hpp>
#include <stout/foreach.hpp>
#include <stout/protobuf.hpp>
#include <stout/recordio.hpp>
#include <stout/stringify.hpp>
#include <stout/unreachable.hpp>
#include <stout/os/permissions.hpp>
#include "common/http.hpp"
#include "messages/messages.hpp"
#include "module/manager.hpp"
using std::map;
using std::ostream;
using std::set;
using std::string;
using std::vector;
using process::Future;
using process::Owned;
using process::Failure;
using process::Owned;
#ifdef USE_SSL_SOCKET
using process::http::authentication::JWTAuthenticator;
#endif // USE_SSL_SOCKET
using process::http::authentication::Principal;
using process::http::authorization::AuthorizationCallbacks;
using mesos::http::authentication::BasicAuthenticatorFactory;
using mesos::http::authentication::CombinedAuthenticator;
namespace mesos {
ostream& operator<<(ostream& stream, ContentType contentType)
{
switch (contentType) {
case ContentType::PROTOBUF: {
return stream << APPLICATION_PROTOBUF;
}
case ContentType::JSON: {
return stream << APPLICATION_JSON;
}
case ContentType::RECORDIO: {
return stream << APPLICATION_RECORDIO;
}
}
UNREACHABLE();
}
namespace internal {
// Set of endpoint whose access is protected with the authorization
// action `GET_ENDPOINTS_WITH_PATH`.
hashset<string> AUTHORIZABLE_ENDPOINTS{
"/containers",
"/files/debug",
"/files/debug.json",
"/logging/toggle",
"/metrics/snapshot",
"/monitor/statistics",
"/monitor/statistics.json"};
string serialize(
ContentType contentType,
const google::protobuf::Message& message)
{
switch (contentType) {
case ContentType::PROTOBUF: {
return message.SerializeAsString();
}
case ContentType::JSON: {
return stringify(JSON::protobuf(message));
}
case ContentType::RECORDIO: {
LOG(FATAL) << "Serializing a RecordIO stream is not supported";
}
}
UNREACHABLE();
}
bool streamingMediaType(ContentType contentType)
{
switch(contentType) {
case ContentType::PROTOBUF:
case ContentType::JSON: {
return false;
}
case ContentType::RECORDIO: {
return true;
}
}
UNREACHABLE();
}
// TODO(bmahler): Kill these in favor of automatic Proto->JSON
// Conversion (when it becomes available).
// Helper function that returns the JSON::value of a given resource (identified
// by 'name' and 'type') inside the resources.
static JSON::Value value(
const string& name,
const Value::Type& type,
const Resources& resources)
{
switch (type) {
case Value::SCALAR:
return resources.get<Value::Scalar>(name).get().value();
case Value::RANGES:
return stringify(resources.get<Value::Ranges>(name).get());
case Value::SET:
return stringify(resources.get<Value::Set>(name).get());
default:
LOG(FATAL) << "Unexpected Value type: " << type;
}
UNREACHABLE();
}
JSON::Object model(const Resources& resources)
{
JSON::Object object;
object.values["cpus"] = 0;
object.values["gpus"] = 0;
object.values["mem"] = 0;
object.values["disk"] = 0;
// Model non-revocable resources.
Resources nonRevocable = resources.nonRevocable();
foreachpair (
const string& name, const Value::Type& type, nonRevocable.types()) {
object.values[name] = value(name, type, nonRevocable);
}
// Model revocable resources.
Resources revocable = resources.revocable();
foreachpair (const string& name, const Value::Type& type, revocable.types()) {
object.values[name + "_revocable"] = value(name, type, revocable);
}
return object;
}
JSON::Object model(const hashmap<string, Resources>& roleResources)
{
JSON::Object object;
foreachpair (const string& role, const Resources& resources, roleResources) {
object.values[role] = model(resources);
}
return object;
}
JSON::Object model(const Attributes& attributes)
{
JSON::Object object;
foreach (const Attribute& attribute, attributes) {
switch (attribute.type()) {
case Value::SCALAR:
object.values[attribute.name()] = attribute.scalar().value();
break;
case Value::RANGES:
object.values[attribute.name()] = stringify(attribute.ranges());
break;
case Value::SET:
object.values[attribute.name()] = stringify(attribute.set());
break;
case Value::TEXT:
object.values[attribute.name()] = attribute.text().value();
break;
default:
LOG(FATAL) << "Unexpected Value type: " << attribute.type();
break;
}
}
return object;
}
JSON::Array model(const Labels& labels)
{
return JSON::protobuf(labels.labels());
}
JSON::Object model(const NetworkInfo& info)
{
JSON::Object object;
if (info.groups().size() > 0) {
JSON::Array array;
array.values.reserve(info.groups().size()); // MESOS-2353.
foreach (const string& group, info.groups()) {
array.values.push_back(group);
}
object.values["groups"] = std::move(array);
}
if (info.has_labels()) {
object.values["labels"] = model(info.labels());
}
if (info.ip_addresses().size() > 0) {
JSON::Array array;
array.values.reserve(info.ip_addresses().size()); // MESOS-2353.
foreach (const NetworkInfo::IPAddress& ipAddress, info.ip_addresses()) {
array.values.push_back(JSON::protobuf(ipAddress));
}
object.values["ip_addresses"] = std::move(array);
}
if (info.has_name()) {
object.values["name"] = info.name();
}
if (info.port_mappings().size() > 0) {
JSON::Array array;
array.values.reserve(info.port_mappings().size()); // MESOS-2353
foreach (const NetworkInfo::PortMapping& portMapping,
info.port_mappings()) {
array.values.push_back(JSON::protobuf(portMapping));
}
object.values["port_mappings"] = std::move(array);
}
return object;
}
JSON::Object model(const ContainerStatus& status)
{
JSON::Object object;
if (status.has_container_id()) {
object.values["container_id"] = JSON::protobuf(status.container_id());
}
if (status.network_infos().size() > 0) {
JSON::Array array;
array.values.reserve(status.network_infos().size()); // MESOS-2353.
foreach (const NetworkInfo& info, status.network_infos()) {
array.values.push_back(model(info));
}
object.values["network_infos"] = std::move(array);
}
if (status.has_cgroup_info()) {
object.values["cgroup_info"] = JSON::protobuf(status.cgroup_info());
}
return object;
}
// Returns a JSON object modeled on a TaskStatus.
JSON::Object model(const TaskStatus& status)
{
JSON::Object object;
object.values["state"] = TaskState_Name(status.state());
object.values["timestamp"] = status.timestamp();
if (status.has_labels()) {
object.values["labels"] = model(status.labels());
}
if (status.has_container_status()) {
object.values["container_status"] = model(status.container_status());
}
if (status.has_healthy()) {
object.values["healthy"] = status.healthy();
}
return object;
}
// TODO(bmahler): Expose the executor name / source.
JSON::Object model(const Task& task)
{
JSON::Object object;
object.values["id"] = task.task_id().value();
object.values["name"] = task.name();
object.values["framework_id"] = task.framework_id().value();
if (task.has_executor_id()) {
object.values["executor_id"] = task.executor_id().value();
} else {
object.values["executor_id"] = "";
}
object.values["slave_id"] = task.slave_id().value();
object.values["state"] = TaskState_Name(task.state());
object.values["resources"] = model(task.resources());
if (task.has_user()) {
object.values["user"] = task.user();
}
{
JSON::Array array;
array.values.reserve(task.statuses().size()); // MESOS-2353.
foreach (const TaskStatus& status, task.statuses()) {
array.values.push_back(model(status));
}
object.values["statuses"] = std::move(array);
}
if (task.has_labels()) {
object.values["labels"] = model(task.labels());
}
if (task.has_discovery()) {
object.values["discovery"] = JSON::protobuf(task.discovery());
}
if (task.has_container()) {
object.values["container"] = JSON::protobuf(task.container());
}
return object;
}
JSON::Object model(const CommandInfo& command)
{
JSON::Object object;
if (command.has_shell()) {
object.values["shell"] = command.shell();
}
if (command.has_value()) {
object.values["value"] = command.value();
}
JSON::Array argv;
foreach (const string& arg, command.arguments()) {
argv.values.push_back(arg);
}
object.values["argv"] = argv;
if (command.has_environment()) {
JSON::Object environment;
JSON::Array variables;
foreach (const Environment_Variable& variable,
command.environment().variables()) {
JSON::Object variableObject;
variableObject.values["name"] = variable.name();
variableObject.values["value"] = variable.value();
variables.values.push_back(variableObject);
}
environment.values["variables"] = variables;
object.values["environment"] = environment;
}
JSON::Array uris;
foreach (const CommandInfo_URI& uri, command.uris()) {
JSON::Object uriObject;
uriObject.values["value"] = uri.value();
uriObject.values["executable"] = uri.executable();
uris.values.push_back(uriObject);
}
object.values["uris"] = uris;
return object;
}
JSON::Object model(const ExecutorInfo& executorInfo)
{
JSON::Object object;
object.values["executor_id"] = executorInfo.executor_id().value();
object.values["name"] = executorInfo.name();
object.values["framework_id"] = executorInfo.framework_id().value();
object.values["command"] = model(executorInfo.command());
object.values["resources"] = model(executorInfo.resources());
if (executorInfo.has_labels()) {
object.values["labels"] = model(executorInfo.labels());
}
return object;
}
// Returns JSON representation of a FileInfo protobuf message.
// Example JSON:
// {
// 'path': '\/some\/file',
// 'mode': '-rwxrwxrwx',
// 'nlink': 5,
// 'uid': 'bmahler',
// 'gid': 'employee',
// 'size': 4096, // Bytes.
// 'mtime': 1348258116, // Unix timestamp.
// }
JSON::Object model(const FileInfo& fileInfo)
{
JSON::Object file;
file.values["path"] = fileInfo.path();
file.values["nlink"] = fileInfo.nlink();
file.values["size"] = fileInfo.size();
file.values["mtime"] = Nanoseconds(fileInfo.mtime().nanoseconds()).secs();
char filetype;
if (S_ISREG(fileInfo.mode())) {
filetype = '-';
} else if (S_ISDIR(fileInfo.mode())) {
filetype = 'd';
} else if (S_ISCHR(fileInfo.mode())) {
filetype = 'c';
} else if (S_ISBLK(fileInfo.mode())) {
filetype = 'b';
} else if (S_ISFIFO(fileInfo.mode())) {
filetype = 'p';
} else if (S_ISLNK(fileInfo.mode())) {
filetype = 'l';
} else if (S_ISSOCK(fileInfo.mode())) {
filetype = 's';
} else {
filetype = '-';
}
struct os::Permissions permissions(fileInfo.mode());
file.values["mode"] = strings::format(
"%c%c%c%c%c%c%c%c%c%c",
filetype,
permissions.owner.r ? 'r' : '-',
permissions.owner.w ? 'w' : '-',
permissions.owner.x ? 'x' : '-',
permissions.group.r ? 'r' : '-',
permissions.group.w ? 'w' : '-',
permissions.group.x ? 'x' : '-',
permissions.others.r ? 'r' : '-',
permissions.others.w ? 'w' : '-',
permissions.others.x ? 'x' : '-').get();
file.values["uid"] = fileInfo.uid();
file.values["gid"] = fileInfo.gid();
return file;
}
JSON::Object model(const quota::QuotaInfo& quotaInfo)
{
JSON::Object object;
object.values["guarantee"] = model(quotaInfo.guarantee());
object.values["role"] = quotaInfo.role();
if (quotaInfo.has_principal()) {
object.values["principal"] = quotaInfo.principal();
}
return object;
}
} // namespace internal {
void json(JSON::ObjectWriter* writer, const Attributes& attributes)
{
foreach (const Attribute& attribute, attributes) {
switch (attribute.type()) {
case Value::SCALAR:
writer->field(attribute.name(), attribute.scalar());
break;
case Value::RANGES:
writer->field(attribute.name(), attribute.ranges());
break;
case Value::SET:
writer->field(attribute.name(), attribute.set());
break;
case Value::TEXT:
writer->field(attribute.name(), attribute.text());
break;
default:
LOG(FATAL) << "Unexpected Value type: " << attribute.type();
}
}
}
void json(JSON::ObjectWriter* writer, const CommandInfo& command)
{
if (command.has_shell()) {
writer->field("shell", command.shell());
}
if (command.has_value()) {
writer->field("value", command.value());
}
writer->field("argv", command.arguments());
if (command.has_environment()) {
writer->field("environment", JSON::Protobuf(command.environment()));
}
writer->field("uris", [&command](JSON::ArrayWriter* writer) {
foreach (const CommandInfo::URI& uri, command.uris()) {
writer->element([&uri](JSON::ObjectWriter* writer) {
writer->field("value", uri.value());
writer->field("executable", uri.executable());
});
}
});
}
static void json(JSON::ObjectWriter* writer, const ContainerStatus& status)
{
if (status.has_container_id()) {
writer->field("container_id", JSON::Protobuf(status.container_id()));
}
if (status.network_infos().size() > 0) {
writer->field("network_infos", status.network_infos());
}
if (status.has_cgroup_info()) {
writer->field("cgroup_info", JSON::Protobuf(status.cgroup_info()));
}
}
void json(JSON::ObjectWriter* writer, const ExecutorInfo& executorInfo)
{
writer->field("executor_id", executorInfo.executor_id().value());
writer->field("name", executorInfo.name());
writer->field("framework_id", executorInfo.framework_id().value());
writer->field("command", executorInfo.command());
writer->field("resources", Resources(executorInfo.resources()));
// Resources may be empty for command executors.
if (!executorInfo.resources().empty()) {
// Executors are not allowed to mix resources allocated to
// different roles, see MESOS-6636.
writer->field(
"role",
executorInfo.resources().begin()->allocation_info().role());
}
if (executorInfo.has_labels()) {
writer->field("labels", executorInfo.labels());
}
if (executorInfo.has_type()) {
writer->field("type", ExecutorInfo::Type_Name(executorInfo.type()));
}
}
void json(JSON::ArrayWriter* writer, const Labels& labels)
{
foreach (const Label& label, labels.labels()) {
writer->element(JSON::Protobuf(label));
}
}
static void json(JSON::ObjectWriter* writer, const NetworkInfo& info)
{
if (info.groups().size() > 0) {
writer->field("groups", info.groups());
}
if (info.has_labels()) {
writer->field("labels", info.labels());
}
if (info.ip_addresses().size() > 0) {
writer->field("ip_addresses", [&info](JSON::ArrayWriter* writer) {
foreach (const NetworkInfo::IPAddress& ipAddress, info.ip_addresses()) {
writer->element(JSON::Protobuf(ipAddress));
}
});
}
if (info.has_name()) {
writer->field("name", info.name());
}
if (info.port_mappings().size() > 0) {
writer->field("port_mappings", [&info](JSON::ArrayWriter* writer) {
foreach(const NetworkInfo::PortMapping& portMapping,
info.port_mappings()) {
writer->element(JSON::Protobuf(portMapping));
}
});
}
}
void json(JSON::ObjectWriter* writer, const Resources& resources)
{
hashmap<string, double> scalars =
{{"cpus", 0}, {"gpus", 0}, {"mem", 0}, {"disk", 0}};
hashmap<string, Value::Ranges> ranges;
hashmap<string, Value::Set> sets;
foreach (const Resource& resource, resources) {
string name =
resource.name() + (Resources::isRevocable(resource) ? "_revocable" : "");
switch (resource.type()) {
case Value::SCALAR:
scalars[name] += resource.scalar().value();
break;
case Value::RANGES:
ranges[name] += resource.ranges();
break;
case Value::SET:
sets[name] += resource.set();
break;
default:
LOG(FATAL) << "Unexpected Value type: " << resource.type();
}
}
json(writer, scalars);
json(writer, ranges);
json(writer, sets);
}
void json(JSON::ObjectWriter* writer, const Task& task)
{
writer->field("id", task.task_id().value());
writer->field("name", task.name());
writer->field("framework_id", task.framework_id().value());
writer->field("executor_id", task.executor_id().value());
writer->field("slave_id", task.slave_id().value());
writer->field("state", TaskState_Name(task.state()));
writer->field("resources", Resources(task.resources()));
// Tasks are not allowed to mix resources allocated to
// different roles, see MESOS-6636.
writer->field("role", task.resources().begin()->allocation_info().role());
writer->field("statuses", task.statuses());
if (task.has_user()) {
writer->field("user", task.user());
}
if (task.has_labels()) {
writer->field("labels", task.labels());
}
if (task.has_discovery()) {
writer->field("discovery", JSON::Protobuf(task.discovery()));
}
if (task.has_container()) {
writer->field("container", JSON::Protobuf(task.container()));
}
}
void json(JSON::ObjectWriter* writer, const TaskStatus& status)
{
writer->field("state", TaskState_Name(status.state()));
writer->field("timestamp", status.timestamp());
if (status.has_labels()) {
writer->field("labels", status.labels());
}
if (status.has_container_status()) {
writer->field("container_status", status.container_status());
}
if (status.has_healthy()) {
writer->field("healthy", status.healthy());
}
}
static void json(
JSON::ObjectWriter* writer,
const DomainInfo::FaultDomain::RegionInfo& regionInfo)
{
writer->field("name", regionInfo.name());
}
static void json(
JSON::ObjectWriter* writer,
const DomainInfo::FaultDomain::ZoneInfo& zoneInfo)
{
writer->field("name", zoneInfo.name());
}
static void json(
JSON::ObjectWriter* writer,
const DomainInfo::FaultDomain& faultDomain)
{
writer->field("region", faultDomain.region());
writer->field("zone", faultDomain.zone());
}
void json(JSON::ObjectWriter* writer, const DomainInfo& domainInfo)
{
if (domainInfo.has_fault_domain()) {
writer->field("fault_domain", domainInfo.fault_domain());
}
}
static void json(JSON::NumberWriter* writer, const Value::Scalar& scalar)
{
writer->set(scalar.value());
}
static void json(JSON::StringWriter* writer, const Value::Ranges& ranges)
{
writer->append(stringify(ranges));
}
static void json(JSON::StringWriter* writer, const Value::Set& set)
{
writer->append(stringify(set));
}
static void json(JSON::StringWriter* writer, const Value::Text& text)
{
writer->append(text.value());
}
namespace authorization {
const Option<authorization::Subject> createSubject(
const Option<Principal>& principal)
{
if (principal.isSome()) {
authorization::Subject subject;
if (principal->value.isSome()) {
subject.set_value(principal->value.get());
}
foreachpair (const string& key, const string& value, principal->claims) {
Label* claim = subject.mutable_claims()->mutable_labels()->Add();
claim->set_key(key);
claim->set_value(value);
}
return subject;
}
return None();
}
} // namespace authorization {
const AuthorizationCallbacks createAuthorizationCallbacks(
Authorizer* authorizer)
{
typedef lambda::function<process::Future<bool>(
const process::http::Request& httpRequest,
const Option<Principal>& principal)> Callback;
AuthorizationCallbacks callbacks;
Callback getEndpoint = [authorizer](
const process::http::Request& httpRequest,
const Option<Principal>& principal) -> process::Future<bool> {
const string path = httpRequest.url.path;
if (!internal::AUTHORIZABLE_ENDPOINTS.contains(path)) {
return Failure(
"Endpoint '" + path + "' is not an authorizable endpoint.");
}
authorization::Request authRequest;
authRequest.set_action(mesos::authorization::GET_ENDPOINT_WITH_PATH);
Option<authorization::Subject> subject =
authorization::createSubject(principal);
if (subject.isSome()) {
authRequest.mutable_subject()->CopyFrom(subject.get());
}
authRequest.mutable_object()->set_value(path);
LOG(INFO) << "Authorizing principal '"
<< (principal.isSome() ? stringify(principal.get()) : "ANY")
<< "' to GET the endpoint '" << path << "'";
return authorizer->authorized(authRequest);
};
callbacks.insert(std::make_pair("/logging/toggle", getEndpoint));
callbacks.insert(std::make_pair("/metrics/snapshot", getEndpoint));
return callbacks;
}
bool approveViewFrameworkInfo(
const Owned<ObjectApprover>& frameworksApprover,
const FrameworkInfo& frameworkInfo)
{
Try<bool> approved =
frameworksApprover->approved(ObjectApprover::Object(frameworkInfo));
if (approved.isError()) {
LOG(WARNING) << "Error during FrameworkInfo authorization: "
<< approved.error();
// TODO(joerg84): Consider exposing these errors to the caller.
return false;
}
return approved.get();
}
bool approveViewExecutorInfo(
const Owned<ObjectApprover>& executorsApprover,
const ExecutorInfo& executorInfo,
const FrameworkInfo& frameworkInfo)
{
Try<bool> approved = executorsApprover->approved(
ObjectApprover::Object(executorInfo, frameworkInfo));
if (approved.isError()) {
LOG(WARNING) << "Error during ExecutorInfo authorization: "
<< approved.error();
// TODO(joerg84): Consider exposing these errors to the caller.
return false;
}
return approved.get();
}
bool approveViewTaskInfo(
const Owned<ObjectApprover>& tasksApprover,
const TaskInfo& taskInfo,
const FrameworkInfo& frameworkInfo)
{
Try<bool> approved =
tasksApprover->approved(ObjectApprover::Object(taskInfo, frameworkInfo));
if (approved.isError()) {
LOG(WARNING) << "Error during TaskInfo authorization: " << approved.error();
// TODO(joerg84): Consider exposing these errors to the caller.
return false;
}
return approved.get();
}
bool approveViewTask(
const Owned<ObjectApprover>& tasksApprover,
const Task& task,
const FrameworkInfo& frameworkInfo)
{
Try<bool> approved =
tasksApprover->approved(ObjectApprover::Object(task, frameworkInfo));
if (approved.isError()) {
LOG(WARNING) << "Error during Task authorization: " << approved.error();
// TODO(joerg84): Consider exposing these errors to the caller.
return false;
}
return approved.get();
}
bool approveViewFlags(
const Owned<ObjectApprover>& flagsApprover)
{
Try<bool> approved = flagsApprover->approved(ObjectApprover::Object());
if (approved.isError()) {
LOG(WARNING) << "Error during Flags authorization: " << approved.error();
// TODO(joerg84): Consider exposing these errors to the caller.
return false;
}
return approved.get();
}
process::Future<bool> authorizeEndpoint(
const string& endpoint,
const string& method,
const Option<Authorizer*>& authorizer,
const Option<Principal>& principal)
{
if (authorizer.isNone()) {
return true;
}
authorization::Request request;
// TODO(nfnt): Add an additional case when POST requests
// need to be authorized separately from GET requests.
if (method == "GET") {
request.set_action(authorization::GET_ENDPOINT_WITH_PATH);
} else {
return Failure("Unexpected request method '" + method + "'");
}
if (!internal::AUTHORIZABLE_ENDPOINTS.contains(endpoint)) {
return Failure(
"Endpoint '" + endpoint + "' is not an authorizable endpoint.");
}
Option<authorization::Subject> subject =
authorization::createSubject(principal);
if (subject.isSome()) {
request.mutable_subject()->CopyFrom(subject.get());
}
request.mutable_object()->set_value(endpoint);
LOG(INFO) << "Authorizing principal '"
<< (principal.isSome() ? stringify(principal.get()) : "ANY")
<< "' to " << method
<< " the '" << endpoint << "' endpoint";
return authorizer.get()->authorized(request);
}
bool approveViewRole(
const Owned<ObjectApprover>& rolesApprover,
const string& role)
{
Try<bool> approved = rolesApprover->approved(ObjectApprover::Object(role));
if (approved.isError()) {
LOG(WARNING) << "Error during Roles authorization: " << approved.error();
// TODO(joerg84): Consider exposing these errors to the caller.
return false;
}
return approved.get();
}
bool authorizeResource(
const Resource& resource,
const Option<Owned<AuthorizationAcceptor>>& acceptor)
{
if (acceptor.isNone()) {
return true;
}
// Necessary because recovered agents are presented in old format.
if (resource.has_role() && resource.role() != "*" &&
!acceptor.get()->accept(resource.role())) {
return false;
}
if (resource.has_allocation_info() &&
!acceptor.get()->accept(resource.allocation_info().role())) {
return false;
}
// Reservations follow a path model where each entry is a child of the
// previous one. Therefore, to accept the resource the acceptor has to
// accept all entries.
foreach (Resource::ReservationInfo reservation, resource.reservations()) {
if (!acceptor.get()->accept(reservation.role())) {
return false;
}
}
return true;
}
namespace {
Result<process::http::authentication::Authenticator*> createBasicAuthenticator(
const string& realm,
const string& authenticatorName,
const Option<Credentials>& credentials)
{
if (credentials.isNone()) {
return Error(
"No credentials provided for the default '" +
string(internal::DEFAULT_BASIC_HTTP_AUTHENTICATOR) +
"' HTTP authenticator for realm '" + realm + "'");
}
LOG(INFO) << "Creating default '"
<< internal::DEFAULT_BASIC_HTTP_AUTHENTICATOR
<< "' HTTP authenticator for realm '" << realm << "'";
return BasicAuthenticatorFactory::create(realm, credentials.get());
}
#ifdef USE_SSL_SOCKET
Result<process::http::authentication::Authenticator*> createJWTAuthenticator(
const string& realm,
const string& authenticatorName,
const Option<string>& secretKey)
{
if (secretKey.isNone()) {
return Error(
"No secret key provided for the default '" +
string(internal::DEFAULT_JWT_HTTP_AUTHENTICATOR) +
"' HTTP authenticator for realm '" + realm + "'");
}
LOG(INFO) << "Creating default '"
<< internal::DEFAULT_JWT_HTTP_AUTHENTICATOR
<< "' HTTP authenticator for realm '" << realm << "'";
return new JWTAuthenticator(realm, secretKey.get());
}
#endif // USE_SSL_SOCKET
Result<process::http::authentication::Authenticator*> createCustomAuthenticator(
const string& realm,
const string& authenticatorName)
{
if (!modules::ModuleManager::contains<
process::http::authentication::Authenticator>(authenticatorName)) {
return Error(
"HTTP authenticator '" + authenticatorName + "' not found. "
"Check the spelling (compare to '" +
string(internal::DEFAULT_BASIC_HTTP_AUTHENTICATOR) +
"') or verify that the authenticator was loaded "
"successfully (see --modules)");
}
LOG(INFO) << "Creating '" << authenticatorName << "' HTTP authenticator "
<< "for realm '" << realm << "'";
return modules::ModuleManager::create<
process::http::authentication::Authenticator>(authenticatorName);
}
} // namespace {
Try<Nothing> initializeHttpAuthenticators(
const string& realm,
const vector<string>& authenticatorNames,
const Option<Credentials>& credentials,
const Option<string>& secretKey)
{
if (authenticatorNames.empty()) {
return Error(
"No HTTP authenticators specified for realm '" + realm + "'");
}
Option<process::http::authentication::Authenticator*> authenticator;
if (authenticatorNames.size() == 1) {
Result<process::http::authentication::Authenticator*> authenticator_ =
None();
if (authenticatorNames[0] == internal::DEFAULT_BASIC_HTTP_AUTHENTICATOR) {
authenticator_ =
createBasicAuthenticator(realm, authenticatorNames[0], credentials);
#ifdef USE_SSL_SOCKET
} else if (
authenticatorNames[0] == internal::DEFAULT_JWT_HTTP_AUTHENTICATOR) {
authenticator_ =
createJWTAuthenticator(realm, authenticatorNames[0], secretKey);
#endif // USE_SSL_SOCKET
} else {
authenticator_ = createCustomAuthenticator(realm, authenticatorNames[0]);
}
if (authenticator_.isError()) {
return Error(
"Failed to create HTTP authenticator module '" +
authenticatorNames[0] + "': " + authenticator_.error());
}
CHECK_SOME(authenticator_);
authenticator = authenticator_.get();
} else {
// There are multiple authenticators loaded for this realm,
// so construct a `CombinedAuthenticator` to handle them.
vector<Owned<process::http::authentication::Authenticator>> authenticators;
foreach (const string& name, authenticatorNames) {
Result<process::http::authentication::Authenticator*> authenticator_ =
None();
if (name == internal::DEFAULT_BASIC_HTTP_AUTHENTICATOR) {
authenticator_ = createBasicAuthenticator(realm, name, credentials);
#ifdef USE_SSL_SOCKET
} else if (name == internal::DEFAULT_JWT_HTTP_AUTHENTICATOR) {
authenticator_ = createJWTAuthenticator(realm, name, secretKey);
#endif // USE_SSL_SOCKET
} else {
authenticator_ = createCustomAuthenticator(realm, name);
}
if (authenticator_.isError()) {
return Error(
"Failed to create HTTP authenticator module '" +
name + "': " + authenticator_.error());
}
CHECK_SOME(authenticator_);
authenticators.push_back(
Owned<process::http::authentication::Authenticator>(
authenticator_.get()));
}
authenticator = new CombinedAuthenticator(realm, std::move(authenticators));
}
CHECK(authenticator.isSome());
// Ownership of the authenticator is passed to libprocess.
process::http::authentication::setAuthenticator(
realm, Owned<process::http::authentication::Authenticator>(
authenticator.get()));
return Nothing();
}
void logRequest(const process::http::Request& request)
{
Option<string> userAgent = request.headers.get("User-Agent");
Option<string> forwardedFor = request.headers.get("X-Forwarded-For");
LOG(INFO) << "HTTP " << request.method << " for " << request.url
<< (request.client.isSome()
? " from " + stringify(request.client.get())
: "")
<< (userAgent.isSome()
? " with User-Agent='" + userAgent.get() + "'"
: "")
<< (forwardedFor.isSome()
? " with X-Forwarded-For='" + forwardedFor.get() + "'"
: "");
}
Future<Owned<AuthorizationAcceptor>> AuthorizationAcceptor::create(
const Option<Principal>& principal,
const Option<Authorizer*>& authorizer,
const authorization::Action& action)
{
if (authorizer.isNone()) {
return Owned<AuthorizationAcceptor>(
new AuthorizationAcceptor(Owned<ObjectApprover>(
new AcceptingObjectApprover())));
}
const Option<authorization::Subject> subject =
authorization::createSubject(principal);
return authorizer.get()->getObjectApprover(subject, action)
.then([=](const Owned<ObjectApprover>& approver) {
return Owned<AuthorizationAcceptor>(
new AuthorizationAcceptor(approver));
});
}
} // namespace mesos {