blob: 44e1576d4462295d404429d51682134306047462 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "resource_provider/manager.hpp"
#include <glog/logging.h>
#include <string>
#include <utility>
#include <mesos/http.hpp>
#include <mesos/resource_provider/resource_provider.hpp>
#include <mesos/v1/resource_provider/resource_provider.hpp>
#include <process/dispatch.hpp>
#include <process/id.hpp>
#include <process/process.hpp>
#include <stout/hashmap.hpp>
#include <stout/protobuf.hpp>
#include <stout/uuid.hpp>
#include "common/http.hpp"
#include "common/recordio.hpp"
#include "internal/devolve.hpp"
#include "internal/evolve.hpp"
#include "resource_provider/validation.hpp"
namespace http = process::http;
using mesos::internal::resource_provider::validation::call::validate;
using mesos::resource_provider::Call;
using mesos::resource_provider::Event;
using process::Failure;
using process::Future;
using process::Owned;
using process::Process;
using process::ProcessBase;
using process::Queue;
using process::dispatch;
using process::spawn;
using process::terminate;
using process::wait;
using process::http::Accepted;
using process::http::BadRequest;
using process::http::OK;
using process::http::MethodNotAllowed;
using process::http::NotAcceptable;
using process::http::NotImplemented;
using process::http::Pipe;
using process::http::UnsupportedMediaType;
using process::http::authentication::Principal;
using std::string;
namespace mesos {
namespace internal {
// Represents the streaming HTTP connection to a resource provider.
struct HttpConnection
{
HttpConnection(const http::Pipe::Writer& _writer,
ContentType _contentType)
: writer(_writer),
contentType(_contentType),
encoder(lambda::bind(serialize, contentType, lambda::_1)) {}
// Converts the message to an Event before sending.
template <typename Message>
bool send(const Message& message)
{
// We need to evolve the internal 'message' into a
// 'v1::resource_provider::Event'.
return writer.write(encoder.encode(evolve(message)));
}
bool close()
{
return writer.close();
}
Future<Nothing> closed() const
{
return writer.readerClosed();
}
http::Pipe::Writer writer;
ContentType contentType;
::recordio::Encoder<v1::resource_provider::Event> encoder;
};
struct ResourceProvider
{
ResourceProvider(
const ResourceProviderInfo& _info,
const HttpConnection& _http)
: info(_info), http(_http) {}
ResourceProviderInfo info;
HttpConnection http;
};
class ResourceProviderManagerProcess
: public Process<ResourceProviderManagerProcess>
{
public:
ResourceProviderManagerProcess();
Future<http::Response> api(
const http::Request& request,
const Option<Principal>& principal);
Queue<ResourceProviderMessage> messages;
hashmap<ResourceProviderID, ResourceProvider> resourceProviders;
private:
void subscribe(
const HttpConnection& http,
const Call::Subscribe& subscribe);
void update(
ResourceProvider* resourceProvider,
const Call::Update& update);
ResourceProviderID newResourceProviderId();
};
ResourceProviderManagerProcess::ResourceProviderManagerProcess()
: ProcessBase(process::ID::generate("resource-provider-manager"))
{
}
Future<http::Response> ResourceProviderManagerProcess::api(
const http::Request& request,
const Option<Principal>& principal)
{
if (request.method != "POST") {
return MethodNotAllowed({"POST"}, request.method);
}
v1::resource_provider::Call v1Call;
// TODO(anand): Content type values are case-insensitive.
Option<string> contentType = request.headers.get("Content-Type");
if (contentType.isNone()) {
return BadRequest("Expecting 'Content-Type' to be present");
}
if (contentType.get() == APPLICATION_PROTOBUF) {
if (!v1Call.ParseFromString(request.body)) {
return BadRequest("Failed to parse body into Call protobuf");
}
} else if (contentType.get() == APPLICATION_JSON) {
Try<JSON::Value> value = JSON::parse(request.body);
if (value.isError()) {
return BadRequest("Failed to parse body into JSON: " + value.error());
}
Try<v1::resource_provider::Call> parse =
::protobuf::parse<v1::resource_provider::Call>(value.get());
if (parse.isError()) {
return BadRequest("Failed to convert JSON into Call protobuf: " +
parse.error());
}
v1Call = parse.get();
} else {
return UnsupportedMediaType(
string("Expecting 'Content-Type' of ") +
APPLICATION_JSON + " or " + APPLICATION_PROTOBUF);
}
Call call = devolve(v1Call);
Option<Error> error = validate(call);
if (error.isSome()) {
return BadRequest(
"Failed to validate resource_provider::Call: " + error->message);
}
ContentType acceptType;
if (request.acceptsMediaType(APPLICATION_JSON)) {
acceptType = ContentType::JSON;
} else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
acceptType = ContentType::PROTOBUF;
} else {
return NotAcceptable(
string("Expecting 'Accept' to allow ") +
"'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
}
switch(call.type()) {
case Call::UNKNOWN: {
return NotImplemented();
}
case Call::SUBSCRIBE: {
Pipe pipe;
OK ok;
ok.headers["Content-Type"] = stringify(acceptType);
ok.type = http::Response::PIPE;
ok.reader = pipe.reader();
HttpConnection http(pipe.writer(), acceptType);
subscribe(http, call.subscribe());
return ok;
}
case Call::UPDATE: {
if (!resourceProviders.contains(call.resource_provider_id())) {
return BadRequest("Resource provider cannot be found");
}
auto resourceProvider = resourceProviders.at(call.resource_provider_id());
update(&resourceProvider, call.update());
return Accepted();
}
}
UNREACHABLE();
}
void ResourceProviderManagerProcess::subscribe(
const HttpConnection& http,
const Call::Subscribe& subscribe)
{
ResourceProviderInfo resourceProviderInfo =
subscribe.resource_provider_info();
resourceProviderInfo.mutable_id()->CopyFrom(newResourceProviderId());
ResourceProvider resourceProvider(resourceProviderInfo, http);
Event event;
event.set_type(Event::SUBSCRIBED);
event.mutable_subscribed()->mutable_provider_id()->CopyFrom(
resourceProvider.info.id());
if (!resourceProvider.http.send(event)) {
LOG(WARNING) << "Unable to send event to resource provider "
<< stringify(resourceProvider.info.id())
<< ": connection closed";
}
resourceProviders.put(resourceProviderInfo.id(), std::move(resourceProvider));
}
void ResourceProviderManagerProcess::update(
ResourceProvider* resourceProvider,
const Call::Update& update)
{
// TODO(nfnt): Implement the 'UPDATE' call handler.
}
ResourceProviderID ResourceProviderManagerProcess::newResourceProviderId()
{
ResourceProviderID resourceProviderId;
resourceProviderId.set_value(UUID::random().toString());
return resourceProviderId;
}
ResourceProviderManager::ResourceProviderManager()
: process(new ResourceProviderManagerProcess())
{
spawn(CHECK_NOTNULL(process.get()));
}
ResourceProviderManager::~ResourceProviderManager()
{
terminate(process.get());
wait(process.get());
}
Future<http::Response> ResourceProviderManager::api(
const http::Request& request,
const Option<Principal>& principal) const
{
return dispatch(
process.get(),
&ResourceProviderManagerProcess::api,
request,
principal);
}
Queue<ResourceProviderMessage> ResourceProviderManager::messages() const
{
return process->messages;
}
} // namespace internal {
} // namespace mesos {