| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include "resource_provider/manager.hpp" |
| |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include <glog/logging.h> |
| |
| #include <mesos/http.hpp> |
| |
| #include <mesos/resource_provider/resource_provider.hpp> |
| |
| #include <mesos/v1/resource_provider/resource_provider.hpp> |
| |
| #include <process/collect.hpp> |
| #include <process/dispatch.hpp> |
| #include <process/id.hpp> |
| #include <process/process.hpp> |
| |
| #include <process/metrics/pull_gauge.hpp> |
| #include <process/metrics/metrics.hpp> |
| |
| #include <stout/hashmap.hpp> |
| #include <stout/nothing.hpp> |
| #include <stout/protobuf.hpp> |
| #include <stout/uuid.hpp> |
| |
| #include "common/http.hpp" |
| #include "common/protobuf_utils.hpp" |
| #include "common/recordio.hpp" |
| #include "common/resources_utils.hpp" |
| |
| #include "internal/devolve.hpp" |
| #include "internal/evolve.hpp" |
| |
| #include "resource_provider/registry.hpp" |
| #include "resource_provider/validation.hpp" |
| |
| namespace http = process::http; |
| |
| using std::string; |
| using std::vector; |
| |
| using mesos::internal::resource_provider::validation::call::validate; |
| |
| using mesos::resource_provider::AdmitResourceProvider; |
| using mesos::resource_provider::Call; |
| using mesos::resource_provider::Event; |
| using mesos::resource_provider::Registrar; |
| |
| using mesos::resource_provider::registry::Registry; |
| |
| using process::Failure; |
| using process::Future; |
| using process::Owned; |
| using process::Process; |
| using process::ProcessBase; |
| using process::Promise; |
| using process::Queue; |
| |
| using process::collect; |
| using process::dispatch; |
| using process::spawn; |
| using process::terminate; |
| using process::wait; |
| |
| using process::http::Accepted; |
| using process::http::BadRequest; |
| using process::http::MethodNotAllowed; |
| using process::http::NotAcceptable; |
| using process::http::NotImplemented; |
| using process::http::OK; |
| using process::http::Pipe; |
| using process::http::UnsupportedMediaType; |
| |
| using process::http::authentication::Principal; |
| |
| using process::metrics::PullGauge; |
| |
| namespace mesos { |
| namespace internal { |
| |
| mesos::resource_provider::registry::ResourceProvider |
| createRegistryResourceProvider(const ResourceProviderInfo& resourceProviderInfo) |
| { |
| mesos::resource_provider::registry::ResourceProvider resourceProvider; |
| |
| CHECK(resourceProviderInfo.has_id()); |
| resourceProvider.mutable_id()->CopyFrom(resourceProviderInfo.id()); |
| |
| resourceProvider.set_name(resourceProviderInfo.name()); |
| resourceProvider.set_type(resourceProviderInfo.type()); |
| |
| return resourceProvider; |
| } |
| |
| // Represents the streaming HTTP connection to a resource provider. |
| struct HttpConnection |
| { |
| HttpConnection(const http::Pipe::Writer& _writer, |
| ContentType _contentType, |
| id::UUID _streamId) |
| : writer(_writer), |
| contentType(_contentType), |
| streamId(_streamId), |
| encoder(lambda::bind(serialize, contentType, lambda::_1)) {} |
| |
| // Converts the message to an Event before sending. |
| template <typename Message> |
| bool send(const Message& message) |
| { |
| // We need to evolve the internal 'message' into a |
| // 'v1::resource_provider::Event'. |
| return writer.write(encoder.encode(evolve(message))); |
| } |
| |
| bool close() |
| { |
| return writer.close(); |
| } |
| |
| Future<Nothing> closed() const |
| { |
| return writer.readerClosed(); |
| } |
| |
| http::Pipe::Writer writer; |
| ContentType contentType; |
| id::UUID streamId; |
| ::recordio::Encoder<v1::resource_provider::Event> encoder; |
| }; |
| |
| |
| struct ResourceProvider |
| { |
| ResourceProvider( |
| const ResourceProviderInfo& _info, |
| const HttpConnection& _http) |
| : info(_info), |
| http(_http) {} |
| |
| ~ResourceProvider() |
| { |
| LOG(INFO) << "Terminating resource provider " << info.id(); |
| |
| http.close(); |
| |
| foreachvalue (const Owned<Promise<Nothing>>& publish, publishes) { |
| publish->fail( |
| "Failed to publish resources from resource provider " + |
| stringify(info.id()) + ": Connection closed"); |
| } |
| } |
| |
| ResourceProviderInfo info; |
| HttpConnection http; |
| hashmap<UUID, Owned<Promise<Nothing>>> publishes; |
| }; |
| |
| |
| class ResourceProviderManagerProcess |
| : public Process<ResourceProviderManagerProcess> |
| { |
| public: |
| ResourceProviderManagerProcess(Owned<Registrar> _registrar); |
| |
| Future<http::Response> api( |
| const http::Request& request, |
| const Option<Principal>& principal); |
| |
| void applyOperation(const ApplyOperationMessage& message); |
| |
| void acknowledgeOperationStatus( |
| const AcknowledgeOperationStatusMessage& message); |
| |
| void reconcileOperations(const ReconcileOperationsMessage& message); |
| |
| Future<Nothing> publishResources(const Resources& resources); |
| |
| Queue<ResourceProviderMessage> messages; |
| |
| private: |
| void subscribe( |
| const HttpConnection& http, |
| const Call::Subscribe& subscribe); |
| |
| void _subscribe( |
| const Future<bool>& admitResourceProvider, |
| Owned<ResourceProvider> resourceProvider); |
| |
| void updateOperationStatus( |
| ResourceProvider* resourceProvider, |
| const Call::UpdateOperationStatus& update); |
| |
| void updateState( |
| ResourceProvider* resourceProvider, |
| const Call::UpdateState& update); |
| |
| void updatePublishResourcesStatus( |
| ResourceProvider* resourceProvider, |
| const Call::UpdatePublishResourcesStatus& update); |
| |
| Future<Nothing> recover( |
| const mesos::resource_provider::registry::Registry& registry); |
| |
| void initialize() override; |
| |
| ResourceProviderID newResourceProviderId(); |
| |
| double gaugeSubscribed(); |
| |
| struct ResourceProviders |
| { |
| hashmap<ResourceProviderID, Owned<ResourceProvider>> subscribed; |
| hashmap< |
| ResourceProviderID, |
| mesos::resource_provider::registry::ResourceProvider> |
| known; |
| } resourceProviders; |
| |
| struct Metrics |
| { |
| explicit Metrics(const ResourceProviderManagerProcess& manager); |
| ~Metrics(); |
| |
| PullGauge subscribed; |
| }; |
| |
| Owned<Registrar> registrar; |
| Promise<Nothing> recovered; |
| |
| Metrics metrics; |
| }; |
| |
| |
| ResourceProviderManagerProcess::ResourceProviderManagerProcess( |
| Owned<Registrar> _registrar) |
| : ProcessBase(process::ID::generate("resource-provider-manager")), |
| registrar(std::move(_registrar)), |
| metrics(*this) |
| { |
| CHECK_NOTNULL(registrar.get()); |
| } |
| |
| |
| void ResourceProviderManagerProcess::initialize() |
| { |
| // Recover the registrar. |
| registrar->recover() |
| .then(defer(self(), &ResourceProviderManagerProcess::recover, lambda::_1)) |
| .onAny([](const Future<Nothing>& recovered) { |
| if (!recovered.isReady()) { |
| LOG(FATAL) |
| << "Failed to recover resource provider manager registry: " |
| << recovered; |
| } |
| }); |
| } |
| |
| |
| Future<Nothing> ResourceProviderManagerProcess::recover( |
| const mesos::resource_provider::registry::Registry& registry) |
| { |
| foreach ( |
| const mesos::resource_provider::registry::ResourceProvider& |
| resourceProvider, |
| registry.resource_providers()) { |
| resourceProviders.known.put(resourceProvider.id(), resourceProvider); |
| } |
| |
| recovered.set(Nothing()); |
| |
| return Nothing(); |
| } |
| |
| |
| Future<http::Response> ResourceProviderManagerProcess::api( |
| const http::Request& request, |
| const Option<Principal>& principal) |
| { |
| // TODO(bbannier): This implementation does not limit the number of messages |
| // in the actor's inbox which could become large should a big number of |
| // resource providers attempt to subscribe before recovery completed. Consider |
| // rejecting requests until the resource provider manager has recovered. This |
| // would likely require implementing retry logic in resource providers. |
| return recovered.future().then(defer( |
| self(), [this, request, principal](const Nothing&) -> http::Response { |
| if (request.method != "POST") { |
| return MethodNotAllowed({"POST"}, request.method); |
| } |
| |
| v1::resource_provider::Call v1Call; |
| |
| // TODO(anand): Content type values are case-insensitive. |
| Option<string> contentType = request.headers.get("Content-Type"); |
| |
| if (contentType.isNone()) { |
| return BadRequest("Expecting 'Content-Type' to be present"); |
| } |
| |
| if (contentType.get() == APPLICATION_PROTOBUF) { |
| if (!v1Call.ParseFromString(request.body)) { |
| return BadRequest("Failed to parse body into Call protobuf"); |
| } |
| } else if (contentType.get() == APPLICATION_JSON) { |
| Try<JSON::Value> value = JSON::parse(request.body); |
| if (value.isError()) { |
| return BadRequest( |
| "Failed to parse body into JSON: " + value.error()); |
| } |
| |
| Try<v1::resource_provider::Call> parse = |
| ::protobuf::parse<v1::resource_provider::Call>(value.get()); |
| |
| if (parse.isError()) { |
| return BadRequest( |
| "Failed to convert JSON into Call protobuf: " + parse.error()); |
| } |
| |
| v1Call = parse.get(); |
| } else { |
| return UnsupportedMediaType( |
| string("Expecting 'Content-Type' of ") + APPLICATION_JSON + |
| " or " + APPLICATION_PROTOBUF); |
| } |
| |
| Call call = devolve(v1Call); |
| |
| Option<Error> error = validate(call); |
| if (error.isSome()) { |
| return BadRequest( |
| "Failed to validate resource_provider::Call: " + error->message); |
| } |
| |
| if (call.type() == Call::SUBSCRIBE) { |
| // We default to JSON 'Content-Type' in the response since an empty |
| // 'Accept' header results in all media types considered acceptable. |
| ContentType acceptType = ContentType::JSON; |
| |
| if (request.acceptsMediaType(APPLICATION_JSON)) { |
| acceptType = ContentType::JSON; |
| } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) { |
| acceptType = ContentType::PROTOBUF; |
| } else { |
| return NotAcceptable( |
| string("Expecting 'Accept' to allow ") + "'" + |
| APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'"); |
| } |
| |
| if (request.headers.contains("Mesos-Stream-Id")) { |
| return BadRequest( |
| "Subscribe calls should not include the 'Mesos-Stream-Id' " |
| "header"); |
| } |
| |
| Pipe pipe; |
| OK ok; |
| |
| ok.headers["Content-Type"] = stringify(acceptType); |
| ok.type = http::Response::PIPE; |
| ok.reader = pipe.reader(); |
| |
| // Generate a stream ID and return it in the response. |
| id::UUID streamId = id::UUID::random(); |
| ok.headers["Mesos-Stream-Id"] = streamId.toString(); |
| |
| HttpConnection http(pipe.writer(), acceptType, streamId); |
| this->subscribe(http, call.subscribe()); |
| |
| return std::move(ok); |
| } |
| |
| if (!this->resourceProviders.subscribed.contains( |
| call.resource_provider_id())) { |
| return BadRequest("Resource provider is not subscribed"); |
| } |
| |
| ResourceProvider* resourceProvider = |
| this->resourceProviders.subscribed.at(call.resource_provider_id()) |
| .get(); |
| |
| // This isn't a `SUBSCRIBE` call, so the request should include a stream |
| // ID. |
| if (!request.headers.contains("Mesos-Stream-Id")) { |
| return BadRequest( |
| "All non-subscribe calls should include to 'Mesos-Stream-Id' " |
| "header"); |
| } |
| |
| const string& streamId = request.headers.at("Mesos-Stream-Id"); |
| if (streamId != resourceProvider->http.streamId.toString()) { |
| return BadRequest( |
| "The stream ID '" + streamId + |
| "' included in this request " |
| "didn't match the stream ID currently associated with " |
| " resource provider ID " + |
| resourceProvider->info.id().value()); |
| } |
| |
| switch (call.type()) { |
| case Call::UNKNOWN: { |
| return NotImplemented(); |
| } |
| |
| case Call::SUBSCRIBE: { |
| // `SUBSCRIBE` call should have been handled above. |
| LOG(FATAL) << "Unexpected 'SUBSCRIBE' call"; |
| } |
| |
| case Call::UPDATE_OPERATION_STATUS: { |
| this->updateOperationStatus( |
| resourceProvider, call.update_operation_status()); |
| |
| return Accepted(); |
| } |
| |
| case Call::UPDATE_STATE: { |
| this->updateState(resourceProvider, call.update_state()); |
| return Accepted(); |
| } |
| |
| case Call::UPDATE_PUBLISH_RESOURCES_STATUS: { |
| this->updatePublishResourcesStatus( |
| resourceProvider, call.update_publish_resources_status()); |
| return Accepted(); |
| } |
| } |
| |
| UNREACHABLE(); |
| })); |
| } |
| |
| |
| void ResourceProviderManagerProcess::applyOperation( |
| const ApplyOperationMessage& message) |
| { |
| const Offer::Operation& operation = message.operation_info(); |
| const FrameworkID& frameworkId = message.framework_id(); |
| const UUID& operationUUID = message.operation_uuid(); |
| |
| Result<ResourceProviderID> resourceProviderId = |
| getResourceProviderId(operation); |
| |
| if (!resourceProviderId.isSome()) { |
| LOG(ERROR) << "Failed to get the resource provider ID of operation " |
| << "'" << operation.id() << "' (uuid: " << operationUUID |
| << ") from framework " << frameworkId << ": " |
| << (resourceProviderId.isError() ? resourceProviderId.error() |
| : "Not found"); |
| return; |
| } |
| |
| if (!resourceProviders.subscribed.contains(resourceProviderId.get())) { |
| LOG(WARNING) << "Dropping operation '" << operation.id() << "' (uuid: " |
| << operationUUID << ") from framework " << frameworkId |
| << " because resource provider " << resourceProviderId.get() |
| << " is not subscribed"; |
| return; |
| } |
| |
| ResourceProvider* resourceProvider = |
| resourceProviders.subscribed.at(resourceProviderId.get()).get(); |
| |
| CHECK(message.resource_version_uuid().has_resource_provider_id()); |
| |
| CHECK_EQ(message.resource_version_uuid().resource_provider_id(), |
| resourceProviderId.get()) |
| << "Resource provider ID " |
| << message.resource_version_uuid().resource_provider_id() |
| << " in resource version UUID does not match that in the operation " |
| << resourceProviderId.get(); |
| |
| Event event; |
| event.set_type(Event::APPLY_OPERATION); |
| event.mutable_apply_operation() |
| ->mutable_framework_id()->CopyFrom(frameworkId); |
| event.mutable_apply_operation()->mutable_info()->CopyFrom(operation); |
| event.mutable_apply_operation() |
| ->mutable_operation_uuid()->CopyFrom(message.operation_uuid()); |
| event.mutable_apply_operation() |
| ->mutable_resource_version_uuid() |
| ->CopyFrom(message.resource_version_uuid().uuid()); |
| |
| if (!resourceProvider->http.send(event)) { |
| LOG(WARNING) << "Failed to send operation '" << operation.id() << "' " |
| << "(uuid: " << operationUUID << ") from framework " |
| << frameworkId << " to resource provider " |
| << resourceProviderId.get() << ": connection closed"; |
| } |
| } |
| |
| |
| void ResourceProviderManagerProcess::acknowledgeOperationStatus( |
| const AcknowledgeOperationStatusMessage& message) |
| { |
| CHECK(message.has_resource_provider_id()); |
| |
| if (!resourceProviders.subscribed.contains(message.resource_provider_id())) { |
| LOG(WARNING) << "Dropping operation status acknowledgement with" |
| << " status_uuid " << message.status_uuid() << " and" |
| << " operation_uuid " << message.operation_uuid() << " because" |
| << " resource provider " << message.resource_provider_id() |
| << " is not subscribed"; |
| return; |
| } |
| |
| ResourceProvider& resourceProvider = |
| *resourceProviders.subscribed.at(message.resource_provider_id()); |
| |
| Event event; |
| event.set_type(Event::ACKNOWLEDGE_OPERATION_STATUS); |
| event.mutable_acknowledge_operation_status() |
| ->mutable_status_uuid() |
| ->CopyFrom(message.status_uuid()); |
| event.mutable_acknowledge_operation_status() |
| ->mutable_operation_uuid() |
| ->CopyFrom(message.operation_uuid()); |
| |
| if (!resourceProvider.http.send(event)) { |
| LOG(WARNING) << "Failed to send operation status acknowledgement with" |
| << " status_uuid " << message.status_uuid() << " and" |
| << " operation_uuid " << message.operation_uuid() << " to" |
| << " resource provider " << message.resource_provider_id() |
| << ": connection closed"; |
| } |
| } |
| |
| |
| void ResourceProviderManagerProcess::reconcileOperations( |
| const ReconcileOperationsMessage& message) |
| { |
| hashmap<ResourceProviderID, Event> events; |
| |
| auto addOperation = |
| [&events](const ReconcileOperationsMessage::Operation& operation) { |
| const ResourceProviderID resourceProviderId = |
| operation.resource_provider_id(); |
| |
| if (events.contains(resourceProviderId)) { |
| events.at(resourceProviderId).mutable_reconcile_operations() |
| ->add_operation_uuids()->CopyFrom(operation.operation_uuid()); |
| } else { |
| Event event; |
| event.set_type(Event::RECONCILE_OPERATIONS); |
| event.mutable_reconcile_operations() |
| ->add_operation_uuids()->CopyFrom(operation.operation_uuid()); |
| |
| events[resourceProviderId] = event; |
| } |
| }; |
| |
| // Construct events for individual resource providers. |
| foreach ( |
| const ReconcileOperationsMessage::Operation& operation, |
| message.operations()) { |
| if (operation.has_resource_provider_id()) { |
| if (!resourceProviders.subscribed.contains( |
| operation.resource_provider_id())) { |
| // TODO(bbannier): We should send `OPERATION_UNREACHABLE` here. |
| LOG(WARNING) << "Dropping operation reconciliation message with" |
| << " operation_uuid " << operation.operation_uuid() |
| << " because resource provider " |
| << operation.resource_provider_id() |
| << " is not subscribed"; |
| continue; |
| } |
| |
| addOperation(operation); |
| } |
| } |
| |
| foreachpair ( |
| const ResourceProviderID& resourceProviderId, |
| const Event& event, |
| events) { |
| CHECK(resourceProviders.subscribed.contains(resourceProviderId)); |
| ResourceProvider& resourceProvider = |
| *resourceProviders.subscribed.at(resourceProviderId); |
| |
| if (!resourceProvider.http.send(event)) { |
| LOG(WARNING) << "Failed to send operation reconciliation event" |
| << " to resource provider " << resourceProviderId |
| << ": connection closed"; |
| } |
| } |
| } |
| |
| |
| Future<Nothing> ResourceProviderManagerProcess::publishResources( |
| const Resources& resources) |
| { |
| hashmap<ResourceProviderID, Resources> providedResources; |
| |
| foreach (const Resource& resource, resources) { |
| // NOTE: We ignore agent default resources here because those |
| // resources do not need publish, and shouldn't be handled by the |
| // resource provider manager. |
| if (!resource.has_provider_id()) { |
| continue; |
| } |
| |
| const ResourceProviderID& resourceProviderId = resource.provider_id(); |
| |
| if (!resourceProviders.subscribed.contains(resourceProviderId)) { |
| // TODO(chhsiao): If the manager is running on an agent and the |
| // resource comes from an external resource provider, we may want |
| // to load the provider's agent component. |
| return Failure( |
| "Resource provider " + stringify(resourceProviderId) + |
| " is not subscribed"); |
| } |
| |
| providedResources[resourceProviderId] += resource; |
| } |
| |
| vector<Future<Nothing>> futures; |
| |
| foreachpair (const ResourceProviderID& resourceProviderId, |
| const Resources& resources, |
| providedResources) { |
| UUID uuid = protobuf::createUUID(); |
| |
| Event event; |
| event.set_type(Event::PUBLISH_RESOURCES); |
| event.mutable_publish_resources()->mutable_uuid()->CopyFrom(uuid); |
| event.mutable_publish_resources()->mutable_resources()->CopyFrom(resources); |
| |
| ResourceProvider* resourceProvider = |
| resourceProviders.subscribed.at(resourceProviderId).get(); |
| |
| LOG(INFO) |
| << "Sending PUBLISH event " << uuid << " with resources '" << resources |
| << "' to resource provider " << resourceProviderId; |
| |
| if (!resourceProvider->http.send(event)) { |
| return Failure( |
| "Failed to send PUBLISH_RESOURCES event to resource provider " + |
| stringify(resourceProviderId) + ": connection closed"); |
| } |
| |
| Owned<Promise<Nothing>> promise(new Promise<Nothing>()); |
| futures.push_back(promise->future()); |
| resourceProvider->publishes.put(uuid, std::move(promise)); |
| } |
| |
| return collect(futures).then([] { return Nothing(); }); |
| } |
| |
| |
| void ResourceProviderManagerProcess::subscribe( |
| const HttpConnection& http, |
| const Call::Subscribe& subscribe) |
| { |
| const ResourceProviderInfo& resourceProviderInfo = |
| subscribe.resource_provider_info(); |
| |
| LOG(INFO) << "Subscribing resource provider " << resourceProviderInfo; |
| |
| // We always create a new `ResourceProvider` struct when a |
| // resource provider subscribes or resubscribes, and replace the |
| // existing `ResourceProvider` if needed. |
| Owned<ResourceProvider> resourceProvider( |
| new ResourceProvider(resourceProviderInfo, http)); |
| |
| Future<bool> admitResourceProvider; |
| |
| if (!resourceProviderInfo.has_id()) { |
| // The resource provider is subscribing for the first time. |
| resourceProvider->info.mutable_id()->CopyFrom(newResourceProviderId()); |
| |
| // If we are handing out a new `ResourceProviderID` persist the ID by |
| // triggering a `AdmitResourceProvider` operation on the registrar. |
| admitResourceProvider = |
| registrar->apply(Owned<mesos::resource_provider::Registrar::Operation>( |
| new AdmitResourceProvider( |
| createRegistryResourceProvider(resourceProvider->info)))); |
| } else { |
| // TODO(chhsiao): The resource provider is resubscribing after being |
| // restarted or an agent failover. The 'ResourceProviderInfo' might |
| // have been updated, but its type and name should remain the same. |
| // We should checkpoint its 'type', 'name' and ID, then check if the |
| // resubscription is consistent with the checkpointed record. |
| |
| const ResourceProviderID& resourceProviderId = resourceProviderInfo.id(); |
| |
| if (!resourceProviders.known.contains(resourceProviderId)) { |
| LOG(INFO) |
| << "Dropping resubscription attempt of resource provider with ID " |
| << resourceProviderId |
| << " since it is unknown"; |
| |
| return; |
| } |
| |
| // Check whether the resource provider has change |
| // information which should be static. |
| mesos::resource_provider::registry::ResourceProvider resourceProvider_ = |
| createRegistryResourceProvider(resourceProvider->info); |
| |
| const mesos::resource_provider::registry::ResourceProvider& |
| storedResourceProvider = resourceProviders.known.at(resourceProviderId); |
| |
| if (resourceProvider_ != storedResourceProvider) { |
| LOG(INFO) |
| << "Dropping resubscription attempt of resource provider " |
| << resourceProvider_ |
| << " since it does not match the previous information " |
| << storedResourceProvider; |
| return; |
| } |
| |
| // If the resource provider is known we do not need to admit it |
| // again, and the registrar operation implicitly succeeded. |
| admitResourceProvider = true; |
| } |
| |
| admitResourceProvider.onAny(defer( |
| self(), |
| &ResourceProviderManagerProcess::_subscribe, |
| lambda::_1, |
| std::move(resourceProvider))); |
| } |
| |
| |
| void ResourceProviderManagerProcess::_subscribe( |
| const Future<bool>& admitResourceProvider, |
| Owned<ResourceProvider> resourceProvider) |
| { |
| if (!admitResourceProvider.isReady()) { |
| LOG(INFO) |
| << "Not subscribing resource provider " << resourceProvider->info.id() |
| << " as registry update did not succeed: " << admitResourceProvider; |
| |
| return; |
| } |
| |
| CHECK(admitResourceProvider.get()) |
| << "Could not admit resource provider " << resourceProvider->info.id() |
| << " as registry update was rejected"; |
| |
| const ResourceProviderID& resourceProviderId = resourceProvider->info.id(); |
| |
| Event event; |
| event.set_type(Event::SUBSCRIBED); |
| event.mutable_subscribed()->mutable_provider_id() |
| ->CopyFrom(resourceProviderId); |
| |
| if (!resourceProvider->http.send(event)) { |
| LOG(WARNING) << "Failed to send SUBSCRIBED event to resource provider " |
| << resourceProviderId << ": connection closed"; |
| return; |
| } |
| |
| resourceProvider->http.closed() |
| .onAny(defer(self(), [=](const Future<Nothing>& future) { |
| // Iff the remote side closes the HTTP connection, the future will be |
| // ready. We will remove the resource provider in that case. |
| // This side closes the HTTP connection only when removing a resource |
| // provider, therefore we shouldn't try to remove it again here. |
| if (future.isReady()) { |
| CHECK(resourceProviders.subscribed.contains(resourceProviderId)); |
| |
| // NOTE: All pending futures of publish requests for the resource |
| // provider will become failed. |
| resourceProviders.subscribed.erase(resourceProviderId); |
| } |
| |
| ResourceProviderMessage::Disconnect disconnect{resourceProviderId}; |
| |
| ResourceProviderMessage message; |
| message.type = ResourceProviderMessage::Type::DISCONNECT; |
| message.disconnect = std::move(disconnect); |
| |
| messages.put(std::move(message)); |
| })); |
| |
| if (!resourceProviders.known.contains(resourceProviderId)) { |
| mesos::resource_provider::registry::ResourceProvider resourceProvider_ = |
| createRegistryResourceProvider(resourceProvider->info); |
| |
| resourceProviders.known.put( |
| resourceProviderId, |
| std::move(resourceProvider_)); |
| } |
| |
| // TODO(jieyu): Start heartbeat for the resource provider. |
| resourceProviders.subscribed.put( |
| resourceProviderId, |
| std::move(resourceProvider)); |
| } |
| |
| |
| void ResourceProviderManagerProcess::updateOperationStatus( |
| ResourceProvider* resourceProvider, |
| const Call::UpdateOperationStatus& update) |
| { |
| CHECK_EQ(update.status().resource_provider_id(), resourceProvider->info.id()); |
| |
| ResourceProviderMessage::UpdateOperationStatus body; |
| body.update.mutable_status()->CopyFrom(update.status()); |
| body.update.mutable_operation_uuid()->CopyFrom(update.operation_uuid()); |
| |
| if (update.has_framework_id()) { |
| body.update.mutable_framework_id()->CopyFrom(update.framework_id()); |
| } |
| |
| if (update.has_latest_status()) { |
| CHECK_EQ( |
| update.latest_status().resource_provider_id(), |
| resourceProvider->info.id()); |
| |
| body.update.mutable_latest_status()->CopyFrom(update.latest_status()); |
| } |
| |
| ResourceProviderMessage message; |
| message.type = ResourceProviderMessage::Type::UPDATE_OPERATION_STATUS; |
| message.updateOperationStatus = std::move(body); |
| |
| messages.put(std::move(message)); |
| } |
| |
| |
| void ResourceProviderManagerProcess::updateState( |
| ResourceProvider* resourceProvider, |
| const Call::UpdateState& update) |
| { |
| foreach (const Resource& resource, update.resources()) { |
| CHECK_EQ(resource.provider_id(), resourceProvider->info.id()); |
| } |
| |
| // TODO(chhsiao): Report pending operations. |
| |
| hashmap<UUID, Operation> operations; |
| foreach (const Operation &operation, update.operations()) { |
| operations.put(operation.uuid(), operation); |
| } |
| |
| LOG(INFO) |
| << "Received UPDATE_STATE call with resources '" << update.resources() |
| << "' and " << operations.size() << " operations from resource provider " |
| << resourceProvider->info.id(); |
| |
| ResourceProviderMessage::UpdateState updateState{ |
| resourceProvider->info, |
| update.resource_version_uuid(), |
| update.resources(), |
| std::move(operations)}; |
| |
| ResourceProviderMessage message; |
| message.type = ResourceProviderMessage::Type::UPDATE_STATE; |
| message.updateState = std::move(updateState); |
| |
| messages.put(std::move(message)); |
| } |
| |
| |
| void ResourceProviderManagerProcess::updatePublishResourcesStatus( |
| ResourceProvider* resourceProvider, |
| const Call::UpdatePublishResourcesStatus& update) |
| { |
| const UUID& uuid = update.uuid(); |
| |
| if (!resourceProvider->publishes.contains(uuid)) { |
| LOG(ERROR) << "Ignoring UpdatePublishResourcesStatus from resource" |
| << " provider " << resourceProvider->info.id() |
| << " because UUID " << uuid << " is unknown"; |
| return; |
| } |
| |
| LOG(INFO) |
| << "Received UPDATE_PUBLISH_RESOURCES_STATUS call for PUBLISH_RESOURCES" |
| << " event " << uuid << " with " << update.status() |
| << " status from resource provider " << resourceProvider->info.id(); |
| |
| if (update.status() == Call::UpdatePublishResourcesStatus::OK) { |
| resourceProvider->publishes.at(uuid)->set(Nothing()); |
| } else { |
| // TODO(jieyu): Consider to include an error message in |
| // 'UpdatePublishResourcesStatus' and surface that to the caller. |
| resourceProvider->publishes.at(uuid)->fail( |
| "Received " + stringify(update.status()) + " status"); |
| } |
| |
| resourceProvider->publishes.erase(uuid); |
| } |
| |
| |
| ResourceProviderID ResourceProviderManagerProcess::newResourceProviderId() |
| { |
| ResourceProviderID resourceProviderId; |
| resourceProviderId.set_value(id::UUID::random().toString()); |
| return resourceProviderId; |
| } |
| |
| |
| double ResourceProviderManagerProcess::gaugeSubscribed() |
| { |
| return static_cast<double>(resourceProviders.subscribed.size()); |
| } |
| |
| |
| ResourceProviderManagerProcess::Metrics::Metrics( |
| const ResourceProviderManagerProcess& manager) |
| : subscribed( |
| "resource_provider_manager/subscribed", |
| defer(manager, &ResourceProviderManagerProcess::gaugeSubscribed)) |
| { |
| process::metrics::add(subscribed); |
| } |
| |
| |
| ResourceProviderManagerProcess::Metrics::~Metrics() |
| { |
| process::metrics::remove(subscribed); |
| } |
| |
| |
| ResourceProviderManager::ResourceProviderManager(Owned<Registrar> registrar) |
| : process(new ResourceProviderManagerProcess(std::move(registrar))) |
| { |
| spawn(CHECK_NOTNULL(process.get())); |
| } |
| |
| |
| ResourceProviderManager::~ResourceProviderManager() |
| { |
| terminate(process.get()); |
| wait(process.get()); |
| } |
| |
| |
| Future<http::Response> ResourceProviderManager::api( |
| const http::Request& request, |
| const Option<Principal>& principal) const |
| { |
| return dispatch( |
| process.get(), |
| &ResourceProviderManagerProcess::api, |
| request, |
| principal); |
| } |
| |
| |
| void ResourceProviderManager::applyOperation( |
| const ApplyOperationMessage& message) const |
| { |
| return dispatch( |
| process.get(), |
| &ResourceProviderManagerProcess::applyOperation, |
| message); |
| } |
| |
| |
| void ResourceProviderManager::acknowledgeOperationStatus( |
| const AcknowledgeOperationStatusMessage& message) const |
| { |
| return dispatch( |
| process.get(), |
| &ResourceProviderManagerProcess::acknowledgeOperationStatus, |
| message); |
| } |
| |
| |
| void ResourceProviderManager::reconcileOperations( |
| const ReconcileOperationsMessage& message) const |
| { |
| return dispatch( |
| process.get(), |
| &ResourceProviderManagerProcess::reconcileOperations, |
| message); |
| } |
| |
| |
| Future<Nothing> ResourceProviderManager::publishResources( |
| const Resources& resources) |
| { |
| return dispatch( |
| process.get(), |
| &ResourceProviderManagerProcess::publishResources, |
| resources); |
| } |
| |
| |
| Queue<ResourceProviderMessage> ResourceProviderManager::messages() const |
| { |
| return process->messages; |
| } |
| |
| } // namespace internal { |
| } // namespace mesos { |