| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include <string> |
| |
| #include <mesos/v1/resource_provider/resource_provider.hpp> |
| |
| #include <process/clock.hpp> |
| #include <process/gmock.hpp> |
| #include <process/http.hpp> |
| |
| #include <stout/lambda.hpp> |
| #include <stout/protobuf.hpp> |
| #include <stout/recordio.hpp> |
| #include <stout/stringify.hpp> |
| |
| #include "common/http.hpp" |
| #include "common/recordio.hpp" |
| |
| #include "slave/slave.hpp" |
| |
| #include "resource_provider/manager.hpp" |
| |
| #include "tests/mesos.hpp" |
| |
| namespace http = process::http; |
| |
| using mesos::internal::slave::Slave; |
| |
| using mesos::master::detector::MasterDetector; |
| |
| using mesos::v1::resource_provider::Call; |
| using mesos::v1::resource_provider::Event; |
| |
| using process::Clock; |
| using process::Future; |
| using process::Owned; |
| |
| using process::http::BadRequest; |
| using process::http::OK; |
| using process::http::UnsupportedMediaType; |
| |
| using std::string; |
| |
| using testing::Values; |
| using testing::WithParamInterface; |
| |
| namespace mesos { |
| namespace internal { |
| namespace tests { |
| |
| class ResourceProviderHttpApiTest |
| : public MesosTest, |
| public WithParamInterface<ContentType> {}; |
| |
| |
| // The tests are parameterized by the content type of the request. |
| INSTANTIATE_TEST_CASE_P( |
| ContentType, |
| ResourceProviderHttpApiTest, |
| Values(ContentType::PROTOBUF, ContentType::JSON)); |
| |
| |
| TEST_F(ResourceProviderHttpApiTest, NoContentType) |
| { |
| http::Request request; |
| request.method = "POST"; |
| request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); |
| |
| ResourceProviderManager manager; |
| |
| Future<http::Response> response = manager.api(request, None()); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response); |
| AWAIT_EXPECT_RESPONSE_BODY_EQ( |
| "Expecting 'Content-Type' to be present", |
| response); |
| } |
| |
| |
| // This test sends a valid JSON blob that cannot be deserialized |
| // into a valid protobuf resulting in a BadRequest. |
| TEST_F(ResourceProviderHttpApiTest, ValidJsonButInvalidProtobuf) |
| { |
| JSON::Object object; |
| object.values["string"] = "valid_json"; |
| |
| http::Request request; |
| request.method = "POST"; |
| request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); |
| request.headers["Accept"] = APPLICATION_JSON; |
| request.headers["Content-Type"] = APPLICATION_JSON; |
| request.body = stringify(object); |
| |
| ResourceProviderManager manager; |
| |
| Future<http::Response> response = manager.api(request, None()); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response); |
| AWAIT_EXPECT_RESPONSE_BODY_EQ( |
| "Failed to validate resource_provider::Call: " |
| "Expecting 'type' to be present", |
| response); |
| } |
| |
| |
| TEST_P(ResourceProviderHttpApiTest, MalformedContent) |
| { |
| const ContentType contentType = GetParam(); |
| |
| http::Request request; |
| request.method = "POST"; |
| request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); |
| request.headers["Accept"] = stringify(contentType); |
| request.headers["Content-Type"] = stringify(contentType); |
| request.body = "MALFORMED_CONTENT"; |
| |
| ResourceProviderManager manager; |
| |
| Future<http::Response> response = manager.api(request, None()); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response); |
| switch (contentType) { |
| case ContentType::PROTOBUF: |
| AWAIT_EXPECT_RESPONSE_BODY_EQ( |
| "Failed to parse body into Call protobuf", |
| response); |
| break; |
| case ContentType::JSON: |
| AWAIT_EXPECT_RESPONSE_BODY_EQ( |
| "Failed to parse body into JSON: " |
| "syntax error at line 1 near: MALFORMED_CONTENT", |
| response); |
| break; |
| case ContentType::RECORDIO: |
| break; |
| } |
| } |
| |
| |
| TEST_P(ResourceProviderHttpApiTest, UnsupportedContentMediaType) |
| { |
| Call call; |
| call.set_type(Call::SUBSCRIBE); |
| |
| Call::Subscribe* subscribe = call.mutable_subscribe(); |
| |
| mesos::v1::ResourceProviderInfo* info = |
| subscribe->mutable_resource_provider_info(); |
| |
| info->set_type("org.apache.mesos.rp.test"); |
| info->set_name("test"); |
| |
| const ContentType contentType = GetParam(); |
| const string unknownMediaType = "application/unknown-media-type"; |
| |
| http::Request request; |
| request.method = "POST"; |
| request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); |
| request.headers["Accept"] = stringify(contentType); |
| request.headers["Content-Type"] = unknownMediaType; |
| request.body = serialize(contentType, call); |
| |
| ResourceProviderManager manager; |
| |
| Future<http::Response> response = manager.api(request, None()); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(UnsupportedMediaType().status, response) |
| << response->body; |
| } |
| |
| |
| TEST_P(ResourceProviderHttpApiTest, Subscribe) |
| { |
| Call call; |
| call.set_type(Call::SUBSCRIBE); |
| |
| Call::Subscribe* subscribe = call.mutable_subscribe(); |
| |
| mesos::v1::ResourceProviderInfo* info = |
| subscribe->mutable_resource_provider_info(); |
| |
| info->set_type("org.apache.mesos.rp.test"); |
| info->set_name("test"); |
| |
| const ContentType contentType = GetParam(); |
| |
| http::Request request; |
| request.method = "POST"; |
| request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); |
| request.headers["Accept"] = stringify(contentType); |
| request.headers["Content-Type"] = stringify(contentType); |
| request.body = serialize(contentType, call); |
| |
| ResourceProviderManager manager; |
| |
| Future<http::Response> response = manager.api(request, None()); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response) << response->body; |
| ASSERT_EQ(http::Response::PIPE, response->type); |
| |
| Option<http::Pipe::Reader> reader = response->reader; |
| ASSERT_SOME(reader); |
| |
| recordio::Reader<Event> responseDecoder( |
| ::recordio::Decoder<Event>( |
| lambda::bind(deserialize<Event>, contentType, lambda::_1)), |
| reader.get()); |
| |
| Future<Result<Event>> event = responseDecoder.read(); |
| AWAIT_READY(event); |
| ASSERT_SOME(event.get()); |
| |
| // Check event type is subscribed and the resource provider id is set. |
| ASSERT_EQ(Event::SUBSCRIBED, event->get().type()); |
| ASSERT_NE("", event->get().subscribed().provider_id().value()); |
| } |
| |
| |
| // This test starts an agent and connects directly with its resource |
| // provider endpoint. |
| TEST_P(ResourceProviderHttpApiTest, AgentEndpoint) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| Try<Owned<cluster::Slave>> agent = StartSlave(detector.get()); |
| ASSERT_SOME(agent); |
| |
| AWAIT_READY(__recover); |
| |
| // Wait for recovery to be complete. |
| Clock::pause(); |
| Clock::settle(); |
| |
| Call call; |
| call.set_type(Call::SUBSCRIBE); |
| |
| Call::Subscribe* subscribe = call.mutable_subscribe(); |
| |
| mesos::v1::ResourceProviderInfo* info = |
| subscribe->mutable_resource_provider_info(); |
| |
| info->set_type("org.apache.mesos.rp.test"); |
| info->set_name("test"); |
| |
| const ContentType contentType = GetParam(); |
| |
| http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); |
| headers["Accept"] = stringify(contentType); |
| |
| Future<http::Response> response = http::streaming::post( |
| agent.get()->pid, |
| "api/v1/resource_provider", |
| headers, |
| serialize(contentType, call), |
| stringify(contentType)); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); |
| AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response); |
| ASSERT_EQ(http::Response::PIPE, response->type); |
| |
| Option<http::Pipe::Reader> reader = response->reader; |
| ASSERT_SOME(reader); |
| |
| recordio::Reader<Event> responseDecoder( |
| ::recordio::Decoder<Event>( |
| lambda::bind(deserialize<Event>, contentType, lambda::_1)), |
| reader.get()); |
| |
| Future<Result<Event>> event = responseDecoder.read(); |
| AWAIT_READY(event); |
| ASSERT_SOME(event.get()); |
| |
| // Check event type is subscribed and the resource provider id is set. |
| ASSERT_EQ(Event::SUBSCRIBED, event->get().type()); |
| ASSERT_NE("", event->get().subscribed().provider_id().value()); |
| } |
| |
| } // namespace tests { |
| } // namespace internal { |
| } // namespace mesos { |