// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <string>
#include <utility>
#include <vector>

#include <gtest/gtest.h>

#include <mesos/http.hpp>
#include <mesos/resources.hpp>

#include <mesos/state/in_memory.hpp>
#include <mesos/state/leveldb.hpp>
#include <mesos/state/state.hpp>

#include <mesos/v1/mesos.hpp>

#include <mesos/v1/resource_provider/resource_provider.hpp>

#include <process/clock.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
#include <process/http.hpp>

#include <stout/duration.hpp>
#include <stout/error.hpp>
#include <stout/gtest.hpp>
#include <stout/lambda.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/protobuf.hpp>
#include <stout/recordio.hpp>
#include <stout/result.hpp>
#include <stout/stringify.hpp>
#include <stout/try.hpp>

#include "common/http.hpp"
#include "common/protobuf_utils.hpp"
#include "common/recordio.hpp"

#include "internal/devolve.hpp"

#include "master/detector/standalone.hpp"

#include "resource_provider/manager.hpp"
#include "resource_provider/registrar.hpp"

#include "slave/slave.hpp"

#include "tests/mesos.hpp"
#include "tests/utils.hpp"

namespace http = process::http;

using mesos::internal::slave::Slave;

using mesos::master::detector::MasterDetector;
using mesos::master::detector::StandaloneMasterDetector;

using mesos::state::InMemoryStorage;
using mesos::state::State;
using mesos::state::Storage;

using mesos::resource_provider::AdmitResourceProvider;
using mesos::resource_provider::Registrar;
using mesos::resource_provider::RemoveResourceProvider;

using mesos::v1::resource_provider::Call;
using mesos::v1::resource_provider::Event;

using process::Clock;
using process::Future;
using process::Message;
using process::Owned;

using process::http::Accepted;
using process::http::BadRequest;
using process::http::OK;
using process::http::UnsupportedMediaType;

using std::string;
using std::vector;

using testing::DoAll;
using testing::Eq;
using testing::Invoke;
using testing::SaveArg;
using testing::Values;
using testing::WithParamInterface;

namespace mesos {
namespace internal {
namespace tests {

class ResourceProviderManagerHttpApiTest
  : public MesosTest,
    public WithParamInterface<ContentType> {};


// The tests are parameterized by the content type of the request.
INSTANTIATE_TEST_CASE_P(
    ContentType,
    ResourceProviderManagerHttpApiTest,
    Values(ContentType::PROTOBUF, ContentType::JSON));


TEST_F(ResourceProviderManagerHttpApiTest, NoContentType)
{
  http::Request request;
  request.method = "POST";
  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);

  ResourceProviderManager manager(
      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());

  Future<http::Response> response = manager.api(request, None());

  AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
  AWAIT_EXPECT_RESPONSE_BODY_EQ(
      "Expecting 'Content-Type' to be present",
      response);
}


// This test sends a valid JSON blob that cannot be deserialized
// into a valid protobuf resulting in a BadRequest.
TEST_F(ResourceProviderManagerHttpApiTest, ValidJsonButInvalidProtobuf)
{
  JSON::Object object;
  object.values["string"] = "valid_json";

  http::Request request;
  request.method = "POST";
  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
  request.headers["Accept"] = APPLICATION_JSON;
  request.headers["Content-Type"] = APPLICATION_JSON;
  request.body = stringify(object);

  ResourceProviderManager manager(
      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());

  Future<http::Response> response = manager.api(request, None());

  AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
  AWAIT_EXPECT_RESPONSE_BODY_EQ(
      "Failed to validate resource_provider::Call: "
      "Expecting 'type' to be present",
      response);
}


TEST_P(ResourceProviderManagerHttpApiTest, MalformedContent)
{
  const ContentType contentType = GetParam();

  http::Request request;
  request.method = "POST";
  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
  request.headers["Accept"] = stringify(contentType);
  request.headers["Content-Type"] = stringify(contentType);
  request.body = "MALFORMED_CONTENT";

  ResourceProviderManager manager(
      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());

  Future<http::Response> response = manager.api(request, None());

  AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
  switch (contentType) {
    case ContentType::PROTOBUF:
      AWAIT_EXPECT_RESPONSE_BODY_EQ(
          "Failed to parse body into Call protobuf",
          response);
      break;
    case ContentType::JSON:
      AWAIT_EXPECT_RESPONSE_BODY_EQ(
          "Failed to parse body into JSON: "
          "syntax error at line 1 near: MALFORMED_CONTENT",
          response);
      break;
    case ContentType::RECORDIO:
      break;
  }
}


TEST_P(ResourceProviderManagerHttpApiTest, UnsupportedContentMediaType)
{
  Call call;
  call.set_type(Call::SUBSCRIBE);

  Call::Subscribe* subscribe = call.mutable_subscribe();

  mesos::v1::ResourceProviderInfo* info =
    subscribe->mutable_resource_provider_info();

  info->set_type("org.apache.mesos.rp.test");
  info->set_name("test");

  const ContentType contentType = GetParam();
  const string unknownMediaType = "application/unknown-media-type";

  http::Request request;
  request.method = "POST";
  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
  request.headers["Accept"] = stringify(contentType);
  request.headers["Content-Type"] = unknownMediaType;
  request.body = serialize(contentType, call);

  ResourceProviderManager manager(
      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());

  Future<http::Response> response = manager.api(request, None());

  AWAIT_EXPECT_RESPONSE_STATUS_EQ(UnsupportedMediaType().status, response);
}


TEST_P(ResourceProviderManagerHttpApiTest, UpdateState)
{
  const ContentType contentType = GetParam();

  ResourceProviderManager manager(
      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());

  Option<id::UUID> streamId;
  Option<mesos::v1::ResourceProviderID> resourceProviderId;

  // First, subscribe to the manager to get the ID.
  {
    Call call;
    call.set_type(Call::SUBSCRIBE);

    Call::Subscribe* subscribe = call.mutable_subscribe();

    mesos::v1::ResourceProviderInfo* info =
      subscribe->mutable_resource_provider_info();

    info->set_type("org.apache.mesos.rp.test");
    info->set_name("test");

    http::Request request;
    request.method = "POST";
    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
    request.headers["Accept"] = stringify(contentType);
    request.headers["Content-Type"] = stringify(contentType);
    request.body = serialize(contentType, call);

    Future<http::Response> response = manager.api(request, None());

    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
    ASSERT_EQ(http::Response::PIPE, response->type);

    ASSERT_TRUE(response->headers.contains("Mesos-Stream-Id"));
    Try<id::UUID> uuid =
      id::UUID::fromString(response->headers.at("Mesos-Stream-Id"));

    CHECK_SOME(uuid);
    streamId = uuid.get();

    Future<ResourceProviderMessage> message = manager.messages().get();
    AWAIT_READY(message);
    ASSERT_EQ(ResourceProviderMessage::Type::SUBSCRIBE, message->type);
    ASSERT_TRUE(message->subscribe->info.has_id());
    resourceProviderId = evolve(message->subscribe->info.id());
  }

  // Then, update the total resources to the manager.
  {
    std::vector<v1::Resource> resources =
      v1::Resources::fromString("disk:4").get();
    foreach (v1::Resource& resource, resources) {
      resource.mutable_provider_id()->CopyFrom(resourceProviderId.get());
    }

    Call call;
    call.set_type(Call::UPDATE_STATE);
    call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());

    Call::UpdateState* updateState = call.mutable_update_state();

    updateState->mutable_resources()->CopyFrom(v1::Resources(resources));
    updateState->mutable_resource_version_uuid()->CopyFrom(
        evolve(protobuf::createUUID()));

    http::Request request;
    request.method = "POST";
    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
    request.headers["Accept"] = stringify(contentType);
    request.headers["Content-Type"] = stringify(contentType);
    request.headers["Mesos-Stream-Id"] = stringify(streamId.get());
    request.body = serialize(contentType, call);

    Future<http::Response> response = manager.api(request, None());

    AWAIT_EXPECT_RESPONSE_STATUS_EQ(Accepted().status, response);

    // The manager will send out a message informing its subscriber
    // about the newly added resources.
    Future<ResourceProviderMessage> message = manager.messages().get();

    AWAIT_READY(message);

    EXPECT_EQ(ResourceProviderMessage::Type::UPDATE_STATE, message->type);
    EXPECT_EQ(
        devolve(resourceProviderId.get()),
        message->updateState->resourceProviderId);
    EXPECT_EQ(devolve(resources), message->updateState->totalResources);
  }
}


TEST_P(ResourceProviderManagerHttpApiTest, UpdateOperationStatus)
{
  const ContentType contentType = GetParam();

  ResourceProviderManager manager(
      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());

  Option<id::UUID> streamId;
  Option<mesos::v1::ResourceProviderID> resourceProviderId;

  // First, subscribe to the manager to get the ID.
  {
    Call call;
    call.set_type(Call::SUBSCRIBE);

    Call::Subscribe* subscribe = call.mutable_subscribe();

    mesos::v1::ResourceProviderInfo* info =
      subscribe->mutable_resource_provider_info();

    info->set_type("org.apache.mesos.rp.test");
    info->set_name("test");

    http::Request request;
    request.method = "POST";
    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
    request.headers["Accept"] = stringify(contentType);
    request.headers["Content-Type"] = stringify(contentType);
    request.body = serialize(contentType, call);

    Future<http::Response> response = manager.api(request, None());

    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
    ASSERT_EQ(http::Response::PIPE, response->type);

    ASSERT_TRUE(response->headers.contains("Mesos-Stream-Id"));
    Try<id::UUID> uuid =
      id::UUID::fromString(response->headers.at("Mesos-Stream-Id"));

    CHECK_SOME(uuid);
    streamId = uuid.get();

    Future<ResourceProviderMessage> message = manager.messages().get();
    AWAIT_READY(message);
    ASSERT_EQ(ResourceProviderMessage::Type::SUBSCRIBE, message->type);
    ASSERT_TRUE(message->subscribe->info.has_id());
    resourceProviderId = evolve(message->subscribe->info.id());
  }

  ASSERT_SOME(resourceProviderId);

  // Then, send an operation status update to the manager.
  {
    v1::FrameworkID frameworkId;
    frameworkId.set_value("foo");

    mesos::v1::OperationStatus status;
    status.set_state(mesos::v1::OperationState::OPERATION_FINISHED);
    status.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());

    mesos::v1::UUID operationUUID = evolve(protobuf::createUUID());;

    Call call;
    call.set_type(Call::UPDATE_OPERATION_STATUS);
    call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());

    Call::UpdateOperationStatus* updateOperationStatus =
      call.mutable_update_operation_status();
    updateOperationStatus->mutable_framework_id()->CopyFrom(frameworkId);
    updateOperationStatus->mutable_status()->CopyFrom(status);
    updateOperationStatus->mutable_operation_uuid()->CopyFrom(operationUUID);

    http::Request request;
    request.method = "POST";
    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
    request.headers["Accept"] = stringify(contentType);
    request.headers["Content-Type"] = stringify(contentType);
    request.headers["Mesos-Stream-Id"] = stringify(streamId.get());
    request.body = serialize(contentType, call);

    Future<http::Response> response = manager.api(request, None());

    AWAIT_EXPECT_RESPONSE_STATUS_EQ(Accepted().status, response);

    // The manager will send out a message informing its subscriber
    // about the updated operation.
    Future<ResourceProviderMessage> message = manager.messages().get();

    AWAIT_READY(message);

    ASSERT_EQ(
        ResourceProviderMessage::Type::UPDATE_OPERATION_STATUS,
        message->type);
    EXPECT_EQ(
        devolve(frameworkId),
        message->updateOperationStatus->update.framework_id());
    EXPECT_EQ(
        devolve(status).state(),
        message->updateOperationStatus->update.status().state());
    EXPECT_EQ(
        operationUUID.value(),
        message->updateOperationStatus->update.operation_uuid().value());
  }
}


// This test verifies that the pending future returned by
// `ResourceProviderManager::publishResources()` becomes ready when the manager
// receives an publish status update with an `OK` status.
TEST_P(ResourceProviderManagerHttpApiTest, PublishResourcesSuccess)
{
  const ContentType contentType = GetParam();

  ResourceProviderManager manager(
      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());

  Option<id::UUID> streamId;
  Option<mesos::v1::ResourceProviderID> resourceProviderId;
  Owned<recordio::Reader<Event>> responseDecoder;

  // First, subscribe to the manager to get the ID.
  {
    Call call;
    call.set_type(Call::SUBSCRIBE);

    Call::Subscribe* subscribe = call.mutable_subscribe();

    mesos::v1::ResourceProviderInfo* info =
      subscribe->mutable_resource_provider_info();

    info->set_type("org.apache.mesos.rp.test");
    info->set_name("test");

    http::Request request;
    request.method = "POST";
    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
    request.headers["Accept"] = stringify(contentType);
    request.headers["Content-Type"] = stringify(contentType);
    request.body = serialize(contentType, call);

    Future<http::Response> response = manager.api(request, None());

    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
    ASSERT_EQ(http::Response::PIPE, response->type);

    ASSERT_TRUE(response->headers.contains("Mesos-Stream-Id"));
    Try<id::UUID> uuid =
      id::UUID::fromString(response->headers.at("Mesos-Stream-Id"));

    CHECK_SOME(uuid);
    streamId = uuid.get();

    Option<http::Pipe::Reader> reader = response->reader;
    ASSERT_SOME(reader);

    responseDecoder.reset(new recordio::Reader<Event>(
        ::recordio::Decoder<Event>(
            lambda::bind(deserialize<Event>, contentType, lambda::_1)),
        reader.get()));

    Future<Result<Event>> event = responseDecoder->read();
    AWAIT_READY(event);
    ASSERT_SOME(event.get());

    // Check event type is subscribed and the resource provider id is set.
    ASSERT_EQ(Event::SUBSCRIBED, event->get().type());

    resourceProviderId = event->get().subscribed().provider_id();

    EXPECT_FALSE(resourceProviderId->value().empty());
  }

  // Then, update the publish status with `OK`.
  {
    vector<v1::Resource> resources =
      v1::Resources::fromString("disk:4").get();
    foreach (v1::Resource& resource, resources) {
      resource.mutable_provider_id()->CopyFrom(resourceProviderId.get());
    }

    Future<Nothing> published = manager.publishResources(devolve(resources));

    Future<Result<Event>> event = responseDecoder->read();
    AWAIT_READY(event);
    ASSERT_SOME(event.get());
    ASSERT_EQ(Event::PUBLISH_RESOURCES, event->get().type());

    Call call;
    call.set_type(Call::UPDATE_PUBLISH_RESOURCES_STATUS);
    call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());

    Call::UpdatePublishResourcesStatus* update =
      call.mutable_update_publish_resources_status();
    update->mutable_uuid()->CopyFrom(event->get().publish_resources().uuid());
    update->set_status(Call::UpdatePublishResourcesStatus::OK);

    http::Request request;
    request.method = "POST";
    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
    request.headers["Accept"] = stringify(contentType);
    request.headers["Content-Type"] = stringify(contentType);
    request.headers["Mesos-Stream-Id"] = stringify(streamId.get());
    request.body = serialize(contentType, call);

    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
        Accepted().status,
        manager.api(request, None()));

    // The manager should satisfy the future.
    AWAIT_READY(published);
  }
}


// This test verifies that the pending future returned by
// `ResourceProviderManager::publishResources()` becomes failed when the manager
// receives an publish status update with a `FAILED` status.
TEST_P(ResourceProviderManagerHttpApiTest, PublishResourcesFailure)
{
  const ContentType contentType = GetParam();

  ResourceProviderManager manager(
      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());

  Option<id::UUID> streamId;
  Option<mesos::v1::ResourceProviderID> resourceProviderId;
  Owned<recordio::Reader<Event>> responseDecoder;

  // First, subscribe to the manager to get the ID.
  {
    Call call;
    call.set_type(Call::SUBSCRIBE);

    Call::Subscribe* subscribe = call.mutable_subscribe();

    mesos::v1::ResourceProviderInfo* info =
      subscribe->mutable_resource_provider_info();

    info->set_type("org.apache.mesos.rp.test");
    info->set_name("test");

    http::Request request;
    request.method = "POST";
    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
    request.headers["Accept"] = stringify(contentType);
    request.headers["Content-Type"] = stringify(contentType);
    request.body = serialize(contentType, call);

    Future<http::Response> response = manager.api(request, None());

    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
    ASSERT_EQ(http::Response::PIPE, response->type);

    ASSERT_TRUE(response->headers.contains("Mesos-Stream-Id"));
    Try<id::UUID> uuid =
      id::UUID::fromString(response->headers.at("Mesos-Stream-Id"));

    CHECK_SOME(uuid);
    streamId = uuid.get();

    Option<http::Pipe::Reader> reader = response->reader;
    ASSERT_SOME(reader);

    responseDecoder.reset(new recordio::Reader<Event>(
        ::recordio::Decoder<Event>(
            lambda::bind(deserialize<Event>, contentType, lambda::_1)),
        reader.get()));

    Future<Result<Event>> event = responseDecoder->read();
    AWAIT_READY(event);
    ASSERT_SOME(event.get());

    // Check event type is subscribed and the resource provider id is set.
    ASSERT_EQ(Event::SUBSCRIBED, event->get().type());

    resourceProviderId = event->get().subscribed().provider_id();

    EXPECT_FALSE(resourceProviderId->value().empty());
  }

  // Then, update the publish status with `FAILED`.
  {
    vector<v1::Resource> resources =
      v1::Resources::fromString("disk:4").get();
    foreach (v1::Resource& resource, resources) {
      resource.mutable_provider_id()->CopyFrom(resourceProviderId.get());
    }

    Future<Nothing> published = manager.publishResources(devolve(resources));

    Future<Result<Event>> event = responseDecoder->read();
    AWAIT_READY(event);
    ASSERT_SOME(event.get());
    ASSERT_EQ(Event::PUBLISH_RESOURCES, event->get().type());

    Call call;
    call.set_type(Call::UPDATE_PUBLISH_RESOURCES_STATUS);
    call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());

    Call::UpdatePublishResourcesStatus* update =
      call.mutable_update_publish_resources_status();
    update->mutable_uuid()->CopyFrom(event->get().publish_resources().uuid());
    update->set_status(Call::UpdatePublishResourcesStatus::FAILED);

    http::Request request;
    request.method = "POST";
    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
    request.headers["Accept"] = stringify(contentType);
    request.headers["Content-Type"] = stringify(contentType);
    request.headers["Mesos-Stream-Id"] = stringify(streamId.get());
    request.body = serialize(contentType, call);

    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
        Accepted().status,
        manager.api(request, None()));

    // The manager should fail the future.
    AWAIT_FAILED(published);
  }
}


// This test verifies that the pending future returned by
// `ResourceProviderManager::publishResources()` becomes failed when the
// resource provider is disconnected.
TEST_P(ResourceProviderManagerHttpApiTest, PublishResourcesDisconnected)
{
  const ContentType contentType = GetParam();

  ResourceProviderManager manager(
      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());

  Option<mesos::v1::ResourceProviderID> resourceProviderId;
  Option<http::Pipe::Reader> reader;
  Owned<recordio::Reader<Event>> responseDecoder;

  // First, subscribe to the manager to get the ID.
  {
    Call call;
    call.set_type(Call::SUBSCRIBE);

    Call::Subscribe* subscribe = call.mutable_subscribe();

    mesos::v1::ResourceProviderInfo* info =
      subscribe->mutable_resource_provider_info();

    info->set_type("org.apache.mesos.rp.test");
    info->set_name("test");

    http::Request request;
    request.method = "POST";
    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
    request.headers["Accept"] = stringify(contentType);
    request.headers["Content-Type"] = stringify(contentType);
    request.body = serialize(contentType, call);

    Future<http::Response> response = manager.api(request, None());

    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
    ASSERT_EQ(http::Response::PIPE, response->type);

    reader = response->reader;
    ASSERT_SOME(reader);

    responseDecoder.reset(new recordio::Reader<Event>(
        ::recordio::Decoder<Event>(
            lambda::bind(deserialize<Event>, contentType, lambda::_1)),
        reader.get()));

    Future<Result<Event>> event = responseDecoder->read();
    AWAIT_READY(event);
    ASSERT_SOME(event.get());

    // Check event type is subscribed and the resource provider id is set.
    ASSERT_EQ(Event::SUBSCRIBED, event->get().type());

    resourceProviderId = event->get().subscribed().provider_id();

    EXPECT_FALSE(resourceProviderId->value().empty());
  }

  // Then, close the connection after receiving a publish resources event.
  {
    vector<v1::Resource> resources =
      v1::Resources::fromString("disk:4").get();
    foreach (v1::Resource& resource, resources) {
      resource.mutable_provider_id()->CopyFrom(resourceProviderId.get());
    }

    Future<Nothing> published = manager.publishResources(devolve(resources));

    Future<Result<Event>> event = responseDecoder->read();
    AWAIT_READY(event);
    ASSERT_SOME(event.get());
    ASSERT_EQ(Event::PUBLISH_RESOURCES, event->get().type());

    reader->close();

    // The manager should fail the future.
    AWAIT_FAILED(published);
  }
}


// This test starts an agent and connects directly with its resource
// provider endpoint.
TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)
{
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  Owned<MasterDetector> detector = master.get()->createDetector();

  // For the agent's resource provider manager to start,
  // the agent needs to have been assigned an agent ID.
  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);

  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
  ASSERT_SOME(agent);

  AWAIT_READY(slaveRegisteredMessage);

  // Wait for recovery to be complete.
  Clock::pause();
  Clock::settle();

  Call call;
  call.set_type(Call::SUBSCRIBE);

  Call::Subscribe* subscribe = call.mutable_subscribe();

  mesos::v1::ResourceProviderInfo* info =
    subscribe->mutable_resource_provider_info();

  info->set_type("org.apache.mesos.rp.test");
  info->set_name("test");

  const ContentType contentType = GetParam();

  http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
  headers["Accept"] = stringify(contentType);

  Future<http::Response> response = http::streaming::post(
      agent.get()->pid,
      "api/v1/resource_provider",
      headers,
      serialize(contentType, call),
      stringify(contentType));

  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
  AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
  ASSERT_EQ(http::Response::PIPE, response->type);

  Option<http::Pipe::Reader> reader = response->reader;
  ASSERT_SOME(reader);

  recordio::Reader<Event> responseDecoder(
      ::recordio::Decoder<Event>(
          lambda::bind(deserialize<Event>, contentType, lambda::_1)),
      reader.get());

  Future<Result<Event>> event = responseDecoder.read();
  AWAIT_READY(event);
  ASSERT_SOME(event.get());

  // Check event type is subscribed and the resource provider id is set.
  EXPECT_EQ(Event::SUBSCRIBED, event->get().type());
  EXPECT_FALSE(event->get().subscribed().provider_id().value().empty());
}


class ResourceProviderRegistrarTest : public tests::MesosTest {};


// Test that the generic resource provider registrar works as expected.
//
// TODO(bbannier): Enable this test on Windows once MESOS-5932 is resolved.
#ifndef __WINDOWS__
TEST_F_TEMP_DISABLED_ON_WINDOWS(ResourceProviderRegistrarTest, GenericRegistrar)
{
  mesos::resource_provider::registry::ResourceProvider resourceProvider;
  resourceProvider.mutable_id()->set_value("foo");
  resourceProvider.set_name("bar");
  resourceProvider.set_type("org.apache.mesos.rp.test");

  // Perform operations on the resource provider. We use
  // persistent storage so we can recover the state below.
  {
    Owned<mesos::state::Storage> storage(
        new mesos::state::LevelDBStorage(sandbox.get()));
    Try<Owned<Registrar>> registrar = Registrar::create(std::move(storage));

    ASSERT_SOME_NE(Owned<Registrar>(nullptr), registrar);

    Future<mesos::resource_provider::registry::Registry> recover =
      registrar.get()->recover();
    AWAIT_READY(recover);
    EXPECT_TRUE(recover->removed_resource_providers().empty());

    Future<bool> admitResourceProvider =
      registrar.get()->apply(Owned<Registrar::Operation>(
          new AdmitResourceProvider(resourceProvider)));
    AWAIT_READY(admitResourceProvider);
    EXPECT_TRUE(admitResourceProvider.get());

    // A resource provider cannot resubscribe with changed type or name.
    mesos::resource_provider::registry::ResourceProvider resourceProvider_ =
      resourceProvider;
    resourceProvider_.set_type("org.apache.mesos.rp.test2");

    admitResourceProvider =
      registrar.get()->apply(Owned<Registrar::Operation>(
          new AdmitResourceProvider(resourceProvider_)));
    AWAIT_READY(admitResourceProvider);
    EXPECT_FALSE(admitResourceProvider.get());

    Future<bool> removeResourceProvider =
      registrar.get()->apply(Owned<Registrar::Operation>(
          new RemoveResourceProvider(resourceProvider.id())));
    AWAIT_READY(removeResourceProvider);
    EXPECT_TRUE(removeResourceProvider.get());

    // A removed resource provider cannot be admitted again.
    admitResourceProvider =
      registrar.get()->apply(Owned<Registrar::Operation>(
          new AdmitResourceProvider(resourceProvider)));
    AWAIT_READY(admitResourceProvider);
    EXPECT_FALSE(admitResourceProvider.get());
  }

  // Recover and validate the previous registry state.
  {
    Owned<mesos::state::Storage> storage(
        new mesos::state::LevelDBStorage(sandbox.get()));
    Try<Owned<Registrar>> registrar = Registrar::create(std::move(storage));

    ASSERT_SOME_NE(Owned<Registrar>(nullptr), registrar);

    Future<mesos::resource_provider::registry::Registry> recover =
      registrar.get()->recover();
    AWAIT_READY(recover);

    EXPECT_TRUE(recover->resource_providers().empty());
    ASSERT_EQ(1, recover->removed_resource_providers_size());

    const mesos::resource_provider::registry::ResourceProvider&
      resourceProvider_ = recover->removed_resource_providers(0);

    EXPECT_EQ(resourceProvider, resourceProvider_);
  }
}
#endif // __WINDOWS__


// Test that the master resource provider registrar works as expected.
//
// TODO(bbannier): Enable this test on Windows once MESOS-5932 is resolved.
#ifndef __WINDOWS__
TEST_F_TEMP_DISABLED_ON_WINDOWS(ResourceProviderRegistrarTest, MasterRegistrar)
{
  mesos::resource_provider::registry::ResourceProvider resourceProvider;
  resourceProvider.mutable_id()->set_value("foo");
  resourceProvider.set_name("bar");
  resourceProvider.set_type("org.apache.mesos.rp.test");

  // Perform operations on the resource provider. We use
  // persistent storage so we can recover the state below.
  {
    mesos::state::LevelDBStorage storage(sandbox.get());
    State state(&storage);
    master::Registrar masterRegistrar(CreateMasterFlags(), &state);

    const MasterInfo masterInfo = protobuf::createMasterInfo({});

    Future<Registry> registry = masterRegistrar.recover(masterInfo);
    AWAIT_READY(registry);

    Try<Owned<Registrar>> registrar = Registrar::create(
        &masterRegistrar,
        registry->resource_provider_registry());

    ASSERT_SOME_NE(Owned<Registrar>(nullptr), registrar);

    Future<bool> admitResourceProvider =
      registrar.get()->apply(Owned<Registrar::Operation>(
            new AdmitResourceProvider(resourceProvider)));
    AWAIT_READY(admitResourceProvider);
    EXPECT_TRUE(admitResourceProvider.get());

    // A resource provider cannot resubscribe with changed type or name.
    mesos::resource_provider::registry::ResourceProvider resourceProvider_ =
      resourceProvider;
    resourceProvider_.set_type("org.apache.mesos.rp.test2");

    admitResourceProvider =
      registrar.get()->apply(Owned<Registrar::Operation>(
          new AdmitResourceProvider(resourceProvider_)));
    AWAIT_READY(admitResourceProvider);
    EXPECT_FALSE(admitResourceProvider.get());

    Future<bool> removeResourceProvider =
      registrar.get()->apply(Owned<Registrar::Operation>(
            new RemoveResourceProvider(resourceProvider.id())));
    AWAIT_READY(removeResourceProvider);
    EXPECT_TRUE(removeResourceProvider.get());

    // A removed resource provider cannot be admitted again.
    admitResourceProvider =
      registrar.get()->apply(Owned<Registrar::Operation>(
            new AdmitResourceProvider(resourceProvider)));
    AWAIT_READY(admitResourceProvider);
    EXPECT_FALSE(admitResourceProvider.get());
  }

  // Recover and validate the previous registry state.
  {
    mesos::state::LevelDBStorage storage(sandbox.get());
    State state(&storage);
    master::Registrar masterRegistrar(CreateMasterFlags(), &state);

    const MasterInfo masterInfo = protobuf::createMasterInfo({});

    Future<Registry> registry = masterRegistrar.recover(masterInfo);
    AWAIT_READY(registry);

    Try<Owned<Registrar>> registrar = Registrar::create(
        &masterRegistrar,
        registry->resource_provider_registry());

    ASSERT_SOME_NE(Owned<Registrar>(nullptr), registrar);

    Future<mesos::resource_provider::registry::Registry> recover =
      registrar.get()->recover();
    AWAIT_READY(recover);

    EXPECT_TRUE(recover->resource_providers().empty());
    ASSERT_EQ(1, recover->removed_resource_providers_size());

    const mesos::resource_provider::registry::ResourceProvider&
      resourceProvider_ = recover->removed_resource_providers(0);

    EXPECT_EQ(resourceProvider, resourceProvider_);
  }
}
#endif // __WINDOWS__


// Test that resource provider resources are offered to frameworks,
// frameworks can accept the offer with an operation that has a resource
// provider convert resources and that the converted resources are
// offered to frameworks as well.
TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources)
{
  // Start master and agent.
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  Owned<MasterDetector> detector = master.get()->createDetector();

  Future<UpdateSlaveMessage> updateSlaveMessage =
    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);

  // Pause the clock and control it manually in order to
  // control the timing of the registration. A registration timeout
  // would trigger multiple registration attempts. As a result, multiple
  // 'UpdateSlaveMessage' would be sent, which we need to avoid to
  // ensure that the second 'UpdateSlaveMessage' is a result of the
  // resource provider registration.
  Clock::pause();

  slave::Flags slaveFlags = CreateSlaveFlags();

  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
  ASSERT_SOME(agent);

  Clock::advance(slaveFlags.registration_backoff_factor);
  Clock::settle();
  AWAIT_READY(updateSlaveMessage);

  v1::Resource disk = v1::createDiskResource(
      "200", "*", None(), None(), v1::createDiskSourceRaw(None(), "profile"));

  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);

  Clock::resume();

  mesos::v1::ResourceProviderInfo resourceProviderInfo;
  resourceProviderInfo.set_type("org.apache.mesos.rp.test");
  resourceProviderInfo.set_name("test");

  v1::MockResourceProvider resourceProvider(
      resourceProviderInfo, Some(v1::Resources(disk)));

  // Start and register a resource provider.
  Owned<EndpointDetector> endpointDetector(
      resource_provider::createEndpointDetector(agent.get()->pid));

  const ContentType contentType = GetParam();

  resourceProvider.start(std::move(endpointDetector), contentType);

  // Wait until the agent's resources have been updated to include the
  // resource provider resources.
  AWAIT_READY(updateSlaveMessage);

  // Start and register a framework.
  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();

  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
  frameworkInfo.set_roles(0, "role");

  EXPECT_CALL(*scheduler, connected(_))
    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));

  Future<v1::scheduler::Event::Subscribed> subscribed;
  EXPECT_CALL(*scheduler, subscribed(_, _))
    .WillOnce(FutureArg<1>(&subscribed));

  Future<v1::scheduler::Event::Offers> offers1;
  EXPECT_CALL(*scheduler, offers(_, _))
    .WillOnce(FutureArg<1>(&offers1));

  EXPECT_CALL(*scheduler, heartbeat(_))
    .WillRepeatedly(Return()); // Ignore heartbeats.

  v1::scheduler::TestMesos mesos(
      master.get()->pid,
      ContentType::PROTOBUF,
      scheduler);

  AWAIT_READY(subscribed);
  v1::FrameworkID frameworkId(subscribed->framework_id());

  // Resource provider resources will be offered to the framework.
  AWAIT_READY(offers1);
  ASSERT_FALSE(offers1->offers().empty());

  const v1::Offer& offer1 = offers1->offers(0);

  v1::Resources resources =
    v1::Resources(offer1.resources()).filter([](const v1::Resource& resource) {
      return resource.has_provider_id();
    });

  ASSERT_FALSE(resources.empty());

  v1::Resource reserved = *(resources.begin());
  reserved.add_reservations()->CopyFrom(
      v1::createDynamicReservationInfo(
          frameworkInfo.roles(0), DEFAULT_CREDENTIAL.principal()));

  Future<v1::scheduler::Event::Offers> offers2;
  EXPECT_CALL(*scheduler, offers(_, _))
    .WillOnce(FutureArg<1>(&offers2))
    .WillRepeatedly(Return()); // Ignore subsequent offers.

  mesos.send(
      v1::createCallAccept(
          frameworkId,
          offer1,
          {v1::RESERVE(reserved),
           v1::CREATE_DISK(reserved, v1::Resource::DiskInfo::Source::MOUNT)}));

  // The converted resource should be offered to the framework.
  AWAIT_READY(offers2);
  ASSERT_FALSE(offers2->offers().empty());

  const v1::Offer& offer2 = offers2->offers(0);

  Option<v1::Resource> mountDisk;
  foreach (const v1::Resource& resource, offer2.resources()) {
    if (resource.has_provider_id()) {
      mountDisk = resource;
    }
  }

  ASSERT_SOME(mountDisk);
  EXPECT_EQ(
      v1::Resource::DiskInfo::Source::MOUNT, mountDisk->disk().source().type());
  EXPECT_FALSE(mountDisk->reservations().empty());
}


// Test that resource provider can resubscribe with an agent after
// a resource provider failover as well as an agent failover.
//
// TODO(bbannier): This test is currently disabled on Windows as the resource
// provider manager uses a LevelDB store which is at the moment not supported on
// Windows (see MESOS-5932) and we use an in-memory store. The in-memory store
// does not survive agent restarts so that resubscription attempts are rejected
// (resource provider is unknown).
TEST_P_TEMP_DISABLED_ON_WINDOWS(
    ResourceProviderManagerHttpApiTest, ResubscribeResourceProvider)
{
  Clock::pause();

  // Start master and agent.
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  Owned<MasterDetector> detector = master.get()->createDetector();

  Future<UpdateSlaveMessage> updateSlaveMessage =
    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);

  slave::Flags slaveFlags = CreateSlaveFlags();

  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
  ASSERT_SOME(agent);

  Clock::advance(slaveFlags.registration_backoff_factor);
  Clock::settle();
  AWAIT_READY(updateSlaveMessage);

  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);

  mesos::v1::ResourceProviderInfo resourceProviderInfo;
  resourceProviderInfo.set_type("org.apache.mesos.rp.test");
  resourceProviderInfo.set_name("test");

  v1::Resource disk = v1::createDiskResource(
      "200", "*", None(), None(), v1::createDiskSourceRaw());

  Owned<v1::MockResourceProvider> resourceProvider(
      new v1::MockResourceProvider(resourceProviderInfo, v1::Resources(disk)));

  // Start and register a resource provider.
  Owned<EndpointDetector> endpointDetector(
      resource_provider::createEndpointDetector(agent.get()->pid));

  const ContentType contentType = GetParam();

  resourceProvider->start(std::move(endpointDetector), contentType);

  // Wait until the agent's resources have been updated to include the
  // resource provider resources. At this point the resource provider
  // will have an ID assigned by the agent.
  AWAIT_READY(updateSlaveMessage);

  ASSERT_TRUE(resourceProvider->info.has_id());

  resourceProviderInfo = resourceProvider->info;

  // Resource provider failover by opening a new connection.
  // The assigned resource provider ID will be used to resubscribe.
  resourceProvider.reset(
      new v1::MockResourceProvider(resourceProviderInfo, v1::Resources(disk)));

  Future<Event::Subscribed> subscribed1;
  EXPECT_CALL(*resourceProvider, subscribed(_))
    .WillOnce(FutureArg<0>(&subscribed1));

  endpointDetector =
    resource_provider::createEndpointDetector(agent.get()->pid);
  resourceProvider->start(std::move(endpointDetector), contentType);

  AWAIT_READY(subscribed1);
  EXPECT_EQ(resourceProviderInfo.id(), subscribed1->provider_id());

  Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);

  // We terminate the resource provider once we have confirmed that it
  // got disconnected. This avoids it to in turn resubscribe racing
  // with the newly created resource provider.
  Future<Nothing> disconnected;
  EXPECT_CALL(*resourceProvider, disconnected())
    .WillOnce(DoAll(
        Invoke([&resourceProvider]() { resourceProvider.reset(); }),
        FutureSatisfy(&disconnected)))
    .WillRepeatedly(Return()); // Ignore spurious calls concurrent with `reset`.

  // The agent failover.
  agent->reset();

  AWAIT_READY(disconnected);

  agent = StartSlave(detector.get(), slaveFlags);

  Clock::advance(slaveFlags.registration_backoff_factor);
  Clock::settle();

  AWAIT_READY(__recover);

  endpointDetector =
    resource_provider::createEndpointDetector(agent.get()->pid);

  resourceProvider.reset(new v1::MockResourceProvider(
      resourceProviderInfo,
      Some(v1::Resources(disk))));

  Future<Event::Subscribed> subscribed2;
  EXPECT_CALL(*resourceProvider, subscribed(_))
    .WillOnce(FutureArg<0>(&subscribed2));

  resourceProvider->start(std::move(endpointDetector), contentType);

  AWAIT_READY(subscribed2);
  EXPECT_EQ(resourceProviderInfo.id(), subscribed2->provider_id());
}


// Test that when a resource provider attempts to resubscribe with an
// unknown ID it is not admitted but disconnected.
TEST_P(ResourceProviderManagerHttpApiTest, ResubscribeUnknownID)
{
  Clock::pause();

  // Start master and agent.
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  Owned<MasterDetector> detector = master.get()->createDetector();

  slave::Flags slaveFlags = CreateSlaveFlags();

  // For the agent's resource provider manager to start,
  // the agent needs to have been assigned an agent ID.
  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);

  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
  ASSERT_SOME(agent);

  Clock::advance(slaveFlags.registration_backoff_factor);
  Clock::settle();

  AWAIT_READY(slaveRegisteredMessage);

  mesos::v1::ResourceProviderID resourceProviderId;
  resourceProviderId.set_value(id::UUID::random().toString());

  mesos::v1::ResourceProviderInfo resourceProviderInfo;
  resourceProviderInfo.mutable_id()->CopyFrom(resourceProviderId);
  resourceProviderInfo.set_type("org.apache.mesos.rp.test");
  resourceProviderInfo.set_name("test");

  Owned<v1::MockResourceProvider> resourceProvider(
      new v1::MockResourceProvider(resourceProviderInfo));

  // We explicitly reset the resource provider after the expected
  // disconnect to prevent it from resubscribing indefinitely.
  Future<Nothing> disconnected;
  EXPECT_CALL(*resourceProvider, disconnected())
    .WillOnce(DoAll(
        Invoke([&resourceProvider]() { resourceProvider.reset(); }),
        FutureSatisfy(&disconnected)));

  // Start and register a resource provider.
  Owned<EndpointDetector> endpointDetector(
      resource_provider::createEndpointDetector(agent.get()->pid));

  const ContentType contentType = GetParam();

  resourceProvider->start(std::move(endpointDetector), contentType);

  AWAIT_READY(disconnected);
}


// This test verifies that a disconnected resource provider will
// result in an `UpdateSlaveMessage` to be sent to the master and the
// total resources of the disconnected resource provider will be
// reduced to empty.
TEST_P(ResourceProviderManagerHttpApiTest, ResourceProviderDisconnect)
{
  Clock::pause();

  // Start master and agent.
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  Owned<MasterDetector> detector = master.get()->createDetector();

  Future<UpdateSlaveMessage> updateSlaveMessage =
    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);

  slave::Flags slaveFlags = CreateSlaveFlags();

  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
  ASSERT_SOME(agent);

  Clock::advance(slaveFlags.registration_backoff_factor);
  Clock::settle();

  AWAIT_READY(updateSlaveMessage);

  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);

  mesos::v1::ResourceProviderInfo resourceProviderInfo;
  resourceProviderInfo.set_type("org.apache.mesos.rp.test");
  resourceProviderInfo.set_name("test");

  v1::Resource disk = v1::createDiskResource(
      "200", "*", None(), None(), v1::createDiskSourceRaw());

  Owned<v1::MockResourceProvider> resourceProvider(
      new v1::MockResourceProvider(
          resourceProviderInfo,
          v1::Resources(disk)));

  // Start and register a resource provider.
  Owned<EndpointDetector> endpointDetector(
      resource_provider::createEndpointDetector(agent.get()->pid));

  const ContentType contentType = GetParam();

  resourceProvider->start(std::move(endpointDetector), contentType);

  {
    // Wait until the agent's resources have been updated to include
    // the resource provider resources. At this point the resource
    // provider will have an ID assigned by the agent.
    AWAIT_READY(updateSlaveMessage);

    ASSERT_TRUE(resourceProvider->info.has_id());
    disk.mutable_provider_id()->CopyFrom(resourceProvider->info.id());

    const Resources& totalResources =
      updateSlaveMessage->resource_providers().providers(0).total_resources();

    EXPECT_TRUE(totalResources.contains(devolve(disk)));
  }

  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);

  // Simulate a resource provider disconnection.
  resourceProvider.reset();

  AWAIT_READY(updateSlaveMessage);
  ASSERT_TRUE(updateSlaveMessage->has_resource_providers());
  EXPECT_EQ(0, updateSlaveMessage->resource_providers().providers_size());
}


// This test verifies that if a second resource provider subscribes
// with the ID of an already connected resource provider, the first
// instance gets disconnected and the second subscription is handled
// as a resubscription.
TEST_F(ResourceProviderManagerHttpApiTest, ResourceProviderSubscribeDisconnect)
{
  Clock::pause();

  // Start master and agent.
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  Owned<MasterDetector> detector = master.get()->createDetector();

  Future<UpdateSlaveMessage> updateSlaveMessage =
    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);

  slave::Flags slaveFlags = CreateSlaveFlags();

  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
  ASSERT_SOME(agent);

  Clock::advance(slaveFlags.registration_backoff_factor);
  Clock::settle();
  AWAIT_READY(updateSlaveMessage);

  mesos::v1::ResourceProviderInfo resourceProviderInfo;
  resourceProviderInfo.set_type("org.apache.mesos.rp.test");
  resourceProviderInfo.set_name("test");

  Owned<v1::MockResourceProvider> resourceProvider1(
      new v1::MockResourceProvider(resourceProviderInfo));

  // Start and register a resource provider.
  Owned<EndpointDetector> endpointDetector(
      resource_provider::createEndpointDetector(agent.get()->pid));

  Future<Event::Subscribed> subscribed1;
  EXPECT_CALL(*resourceProvider1, subscribed(_))
    .WillOnce(FutureArg<0>(&subscribed1));

  resourceProvider1->start(std::move(endpointDetector), ContentType::PROTOBUF);

  AWAIT_READY(subscribed1);

  resourceProviderInfo.mutable_id()->CopyFrom(subscribed1->provider_id());

  // Subscribing a second resource provider with the same ID will
  // disconnect the first instance and handle the subscription by the
  // second resource provider as a resubscription.
  Owned<v1::MockResourceProvider> resourceProvider2(
      new v1::MockResourceProvider(resourceProviderInfo));

  // We terminate the first resource provider once we have confirmed
  // that it got disconnected. This avoids it to in turn resubscribe
  // racing with the other resource provider.
  Future<Nothing> disconnected1;
  EXPECT_CALL(*resourceProvider1, disconnected())
    .WillOnce(DoAll(
        Invoke([&resourceProvider1]() { resourceProvider1.reset(); }),
        FutureSatisfy(&disconnected1)))
    .WillRepeatedly(Return()); // Ignore spurious calls concurrent with `reset`.

  Future<Event::Subscribed> subscribed2;
  EXPECT_CALL(*resourceProvider2, subscribed(_))
    .WillOnce(FutureArg<0>(&subscribed2));

  endpointDetector =
    resource_provider::createEndpointDetector(agent.get()->pid);
  resourceProvider2->start(std::move(endpointDetector), ContentType::PROTOBUF);

  AWAIT_READY(disconnected1);
  AWAIT_READY(subscribed2);
}


TEST_F(ResourceProviderManagerHttpApiTest, Metrics)
{
  Clock::pause();

  // Start master and agent.
  Try<Owned<cluster::Master>> master = StartMaster();
  ASSERT_SOME(master);

  Owned<MasterDetector> detector = master.get()->createDetector();

  Future<UpdateSlaveMessage> updateSlaveMessage =
    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);

  slave::Flags slaveFlags = CreateSlaveFlags();

  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
  ASSERT_SOME(agent);

  Clock::advance(slaveFlags.registration_backoff_factor);
  Clock::settle();

  AWAIT_READY(updateSlaveMessage);

  JSON::Object snapshot = Metrics();

  ASSERT_EQ(1u, snapshot.values.count("resource_provider_manager/subscribed"));
  EXPECT_EQ(0, snapshot.values.at("resource_provider_manager/subscribed"));

  ASSERT_EQ(
      1u, snapshot.values.count("resource_provider_manager/events/subscribe"));
  EXPECT_EQ(
      0, snapshot.values.at("resource_provider_manager/events/subscribe"));

  ASSERT_EQ(
      1u, snapshot.values.count("resource_provider_manager/events/disconnect"));
  EXPECT_EQ(
      0, snapshot.values.at("resource_provider_manager/events/disconnect"));

  mesos::v1::ResourceProviderInfo resourceProviderInfo;
  resourceProviderInfo.set_type("org.apache.mesos.rp.test");
  resourceProviderInfo.set_name("test");

  Owned<v1::MockResourceProvider> resourceProvider(
      new v1::MockResourceProvider(resourceProviderInfo));

  // Start and register a resource provider.
  Owned<EndpointDetector> endpointDetector(
      resource_provider::createEndpointDetector(agent.get()->pid));

  Future<Event::Subscribed> subscribed;
  EXPECT_CALL(*resourceProvider, subscribed(_))
    .WillOnce(FutureArg<0>(&subscribed));

  resourceProvider->start(std::move(endpointDetector), ContentType::PROTOBUF);

  AWAIT_READY(subscribed);

  snapshot = Metrics();

  ASSERT_EQ(1u, snapshot.values.count("resource_provider_manager/subscribed"));
  EXPECT_EQ(1, snapshot.values.at("resource_provider_manager/subscribed"));

  ASSERT_EQ(
      1u, snapshot.values.count("resource_provider_manager/events/subscribe"));
  EXPECT_EQ(
      1, snapshot.values.at("resource_provider_manager/events/subscribe"));

  ASSERT_EQ(
      1u, snapshot.values.count("resource_provider_manager/events/disconnect"));
  EXPECT_EQ(
      0, snapshot.values.at("resource_provider_manager/events/disconnect"));

  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
  resourceProvider.reset();

  // Make sure the resource provider manager processes the disconnection.
  Clock::settle();

  snapshot = Metrics();

  ASSERT_EQ(1u, snapshot.values.count("resource_provider_manager/subscribed"));
  EXPECT_EQ(0, snapshot.values.at("resource_provider_manager/subscribed"));

  ASSERT_EQ(
      1u, snapshot.values.count("resource_provider_manager/events/subscribe"));
  EXPECT_EQ(
      1, snapshot.values.at("resource_provider_manager/events/subscribe"));

  ASSERT_EQ(
      1u, snapshot.values.count("resource_provider_manager/events/disconnect"));
  EXPECT_EQ(
      1, snapshot.values.at("resource_provider_manager/events/disconnect"));
}


TEST_F(ResourceProviderManagerHttpApiTest, RemoveResourceProvider)
{
  const ContentType contentType = ContentType::PROTOBUF;

  ResourceProviderManager manager(
      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());

  Future<ResourceProviderMessage> message = manager.messages().get();

  Option<ResourceProviderID> resourceProviderId;

  // Subscribe a resource provider.
  {
    Call call;
    call.set_type(Call::SUBSCRIBE);

    Call::Subscribe* subscribe = call.mutable_subscribe();

    mesos::v1::ResourceProviderInfo* info =
      subscribe->mutable_resource_provider_info();

    info->set_type("org.apache.mesos.rp.test");
    info->set_name("test");

    http::Request request;
    request.method = "POST";
    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
    request.headers["Accept"] = stringify(contentType);
    request.headers["Content-Type"] = stringify(contentType);
    request.body = serialize(contentType, call);

    Future<http::Response> response = manager.api(request, None());
    AWAIT_READY(response);

    AWAIT_READY(message);
    ASSERT_EQ(ResourceProviderMessage::Type::SUBSCRIBE, message->type);
    ASSERT_SOME(message->subscribe);
    ASSERT_TRUE(message->subscribe->info.has_id());
    resourceProviderId = message->subscribe->info.id();
  }

  // Remove the resource provider. We expect to receive a notification.
  message = manager.messages().get();

  Future<Nothing> removeResourceProvider =
    manager.removeResourceProvider(resourceProviderId.get());

  AWAIT_READY(removeResourceProvider);

  AWAIT_READY(message);
  ASSERT_EQ(ResourceProviderMessage::Type::REMOVE, message->type);
  ASSERT_SOME(message->remove);
  EXPECT_EQ(resourceProviderId.get(), message->remove->resourceProviderId);

  // Attempting to resubscribe this resource provider fails.
  {
    Call call;
    call.set_type(Call::SUBSCRIBE);

    Call::Subscribe* subscribe = call.mutable_subscribe();

    mesos::v1::ResourceProviderInfo* info =
      subscribe->mutable_resource_provider_info();

    info->set_type("org.apache.mesos.rp.test");
    info->set_name("test");
    info->mutable_id()->CopyFrom(evolve(resourceProviderId.get()));

    http::Request request;
    request.method = "POST";
    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
    request.headers["Accept"] = stringify(contentType);
    request.headers["Content-Type"] = stringify(contentType);
    request.body = serialize(contentType, call);

    Future<http::Response> response = manager.api(request, None());

    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
    ASSERT_EQ(http::Response::PIPE, response->type);

    Option<http::Pipe::Reader> reader = response->reader;
    ASSERT_SOME(reader);

    recordio::Reader<Event> responseDecoder(
        ::recordio::Decoder<Event>(
            lambda::bind(deserialize<Event>, contentType, lambda::_1)),
        reader.get());

    // We expect the manager to drop the subscribe call since
    // the resource provider is not known at this point.
    Future<Result<Event>> event = responseDecoder.read();
    AWAIT_READY(event);
    EXPECT_NONE(event.get());
  }
}

} // namespace tests {
} // namespace internal {
} // namespace mesos {
