blob: 85906ea5e1bb3516ef264de22913ce0a3c9c58c5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <string>
#include <mesos/v1/resource_provider/resource_provider.hpp>
#include <process/clock.hpp>
#include <process/gmock.hpp>
#include <process/http.hpp>
#include <stout/lambda.hpp>
#include <stout/protobuf.hpp>
#include <stout/recordio.hpp>
#include <stout/stringify.hpp>
#include "common/http.hpp"
#include "common/recordio.hpp"
#include "slave/slave.hpp"
#include "resource_provider/manager.hpp"
#include "tests/mesos.hpp"
namespace http = process::http;
using mesos::internal::slave::Slave;
using mesos::master::detector::MasterDetector;
using mesos::v1::resource_provider::Call;
using mesos::v1::resource_provider::Event;
using process::Clock;
using process::Future;
using process::Owned;
using process::http::BadRequest;
using process::http::OK;
using process::http::UnsupportedMediaType;
using std::string;
using testing::Values;
using testing::WithParamInterface;
namespace mesos {
namespace internal {
namespace tests {
class ResourceProviderHttpApiTest
: public MesosTest,
public WithParamInterface<ContentType> {};
// The tests are parameterized by the content type of the request.
INSTANTIATE_TEST_CASE_P(
ContentType,
ResourceProviderHttpApiTest,
Values(ContentType::PROTOBUF, ContentType::JSON));
TEST_F(ResourceProviderHttpApiTest, NoContentType)
{
http::Request request;
request.method = "POST";
request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
ResourceProviderManager manager;
Future<http::Response> response = manager.api(request, None());
AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
AWAIT_EXPECT_RESPONSE_BODY_EQ(
"Expecting 'Content-Type' to be present",
response);
}
// This test sends a valid JSON blob that cannot be deserialized
// into a valid protobuf resulting in a BadRequest.
TEST_F(ResourceProviderHttpApiTest, ValidJsonButInvalidProtobuf)
{
JSON::Object object;
object.values["string"] = "valid_json";
http::Request request;
request.method = "POST";
request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
request.headers["Accept"] = APPLICATION_JSON;
request.headers["Content-Type"] = APPLICATION_JSON;
request.body = stringify(object);
ResourceProviderManager manager;
Future<http::Response> response = manager.api(request, None());
AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
AWAIT_EXPECT_RESPONSE_BODY_EQ(
"Failed to validate resource_provider::Call: "
"Expecting 'type' to be present",
response);
}
TEST_P(ResourceProviderHttpApiTest, MalformedContent)
{
const ContentType contentType = GetParam();
http::Request request;
request.method = "POST";
request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
request.headers["Accept"] = stringify(contentType);
request.headers["Content-Type"] = stringify(contentType);
request.body = "MALFORMED_CONTENT";
ResourceProviderManager manager;
Future<http::Response> response = manager.api(request, None());
AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
switch (contentType) {
case ContentType::PROTOBUF:
AWAIT_EXPECT_RESPONSE_BODY_EQ(
"Failed to parse body into Call protobuf",
response);
break;
case ContentType::JSON:
AWAIT_EXPECT_RESPONSE_BODY_EQ(
"Failed to parse body into JSON: "
"syntax error at line 1 near: MALFORMED_CONTENT",
response);
break;
case ContentType::RECORDIO:
break;
}
}
TEST_P(ResourceProviderHttpApiTest, UnsupportedContentMediaType)
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
mesos::v1::ResourceProviderInfo* info =
subscribe->mutable_resource_provider_info();
info->set_type("org.apache.mesos.rp.test");
info->set_name("test");
const ContentType contentType = GetParam();
const string unknownMediaType = "application/unknown-media-type";
http::Request request;
request.method = "POST";
request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
request.headers["Accept"] = stringify(contentType);
request.headers["Content-Type"] = unknownMediaType;
request.body = serialize(contentType, call);
ResourceProviderManager manager;
Future<http::Response> response = manager.api(request, None());
AWAIT_EXPECT_RESPONSE_STATUS_EQ(UnsupportedMediaType().status, response)
<< response->body;
}
TEST_P(ResourceProviderHttpApiTest, Subscribe)
{
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
mesos::v1::ResourceProviderInfo* info =
subscribe->mutable_resource_provider_info();
info->set_type("org.apache.mesos.rp.test");
info->set_name("test");
const ContentType contentType = GetParam();
http::Request request;
request.method = "POST";
request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
request.headers["Accept"] = stringify(contentType);
request.headers["Content-Type"] = stringify(contentType);
request.body = serialize(contentType, call);
ResourceProviderManager manager;
Future<http::Response> response = manager.api(request, None());
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response) << response->body;
ASSERT_EQ(http::Response::PIPE, response->type);
Option<http::Pipe::Reader> reader = response->reader;
ASSERT_SOME(reader);
recordio::Reader<Event> responseDecoder(
::recordio::Decoder<Event>(
lambda::bind(deserialize<Event>, contentType, lambda::_1)),
reader.get());
Future<Result<Event>> event = responseDecoder.read();
AWAIT_READY(event);
ASSERT_SOME(event.get());
// Check event type is subscribed and the resource provider id is set.
ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
ASSERT_NE("", event->get().subscribed().provider_id().value());
}
// This test starts an agent and connects directly with its resource
// provider endpoint.
TEST_P(ResourceProviderHttpApiTest, AgentEndpoint)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
ASSERT_SOME(agent);
AWAIT_READY(__recover);
// Wait for recovery to be complete.
Clock::pause();
Clock::settle();
Call call;
call.set_type(Call::SUBSCRIBE);
Call::Subscribe* subscribe = call.mutable_subscribe();
mesos::v1::ResourceProviderInfo* info =
subscribe->mutable_resource_provider_info();
info->set_type("org.apache.mesos.rp.test");
info->set_name("test");
const ContentType contentType = GetParam();
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<http::Response> response = http::streaming::post(
agent.get()->pid,
"api/v1/resource_provider",
headers,
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
ASSERT_EQ(http::Response::PIPE, response->type);
Option<http::Pipe::Reader> reader = response->reader;
ASSERT_SOME(reader);
recordio::Reader<Event> responseDecoder(
::recordio::Decoder<Event>(
lambda::bind(deserialize<Event>, contentType, lambda::_1)),
reader.get());
Future<Result<Event>> event = responseDecoder.read();
AWAIT_READY(event);
ASSERT_SOME(event.get());
// Check event type is subscribed and the resource provider id is set.
ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
ASSERT_NE("", event->get().subscribed().provider_id().value());
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {