| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include <string> |
| #include <vector> |
| |
| #include <mesos/v1/executor/executor.hpp> |
| |
| #include <mesos/http.hpp> |
| |
| #include <process/clock.hpp> |
| #include <process/future.hpp> |
| #include <process/gtest.hpp> |
| #include <process/http.hpp> |
| #include <process/message.hpp> |
| #include <process/owned.hpp> |
| #include <process/pid.hpp> |
| |
| #include "common/http.hpp" |
| #include "common/recordio.hpp" |
| |
| #include "master/master.hpp" |
| |
| #include "master/detector/standalone.hpp" |
| |
| #include "tests/containerizer.hpp" |
| #include "tests/mesos.hpp" |
| |
| #include "tests/containerizer/mock_containerizer.hpp" |
| |
| using mesos::internal::master::Master; |
| |
| using mesos::internal::recordio::Reader; |
| |
| using mesos::internal::slave::Slave; |
| |
| using mesos::master::detector::MasterDetector; |
| using mesos::master::detector::StandaloneMasterDetector; |
| |
| using mesos::v1::executor::Call; |
| using mesos::v1::executor::Event; |
| |
| using process::Clock; |
| using process::Future; |
| using process::Message; |
| using process::Owned; |
| using process::PID; |
| using process::Promise; |
| |
| using process::http::BadRequest; |
| using process::http::MethodNotAllowed; |
| using process::http::NotAcceptable; |
| using process::http::OK; |
| using process::http::Pipe; |
| using process::http::Response; |
| using process::http::ServiceUnavailable; |
| using process::http::UnsupportedMediaType; |
| |
| using recordio::Decoder; |
| |
| using std::string; |
| using std::vector; |
| |
| using testing::Eq; |
| using testing::WithParamInterface; |
| |
| namespace mesos { |
| namespace internal { |
| namespace tests { |
| |
| |
| class ExecutorHttpApiTest |
| : public MesosTest, |
| public WithParamInterface<ContentType> |
| { |
| protected: |
| slave::Flags CreateSlaveFlags() override |
| { |
| slave::Flags flags = MesosTest::CreateSlaveFlags(); |
| |
| #ifdef USE_SSL_SOCKET |
| // Disable executor authentication on the agent. Executor authentication |
| // currently has SSL as a dependency, so this is only necessary if Mesos was |
| // built with SSL support. |
| flags.authenticate_http_executors = false; |
| flags.executor_secret_key = None(); |
| #endif // USE_SSL_SOCKET |
| |
| return flags; |
| } |
| }; |
| |
| |
| // The tests are parameterized by the content type of the request. |
| INSTANTIATE_TEST_CASE_P( |
| ContentType, |
| ExecutorHttpApiTest, |
| ::testing::Values(ContentType::PROTOBUF, ContentType::JSON)); |
| |
| |
| // TODO(anand): Add more validation tests for: |
| // - If the slave is still recovering, the call should return |
| // ServiceUnavailable. |
| // - If Executor is not found, the call should return |
| // BadRequest. |
| // - If Executor has not registered and sends a Call message other |
| // than Subscribe, the call should return Forbidden. |
| |
| |
| // This test expects a BadRequest when 'Content-Type' is omitted. |
| TEST_F(ExecutorHttpApiTest, NoContentType) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(__recover); |
| |
| // Wait for recovery to be complete. |
| Clock::pause(); |
| Clock::settle(); |
| |
| Call call; |
| call.mutable_framework_id()->set_value("dummy_framework_id"); |
| call.mutable_executor_id()->set_value("dummy_executor_id"); |
| call.set_type(Call::MESSAGE); |
| |
| call.mutable_message()->set_data("hello world"); |
| |
| Future<Response> response = process::http::post( |
| slave.get()->pid, |
| "api/v1/executor", |
| None(), |
| serialize(ContentType::JSON, call), |
| None()); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response); |
| } |
| |
| |
| // This test sends a valid JSON blob that cannot be deserialized |
| // into a valid protobuf resulting in a BadRequest. |
| TEST_F(ExecutorHttpApiTest, ValidJsonButInvalidProtobuf) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(__recover); |
| |
| // Wait for recovery to be complete. |
| Clock::pause(); |
| Clock::settle(); |
| |
| JSON::Object object; |
| object.values["string"] = "valid_json"; |
| |
| process::http::Headers headers; |
| headers["Accept"] = APPLICATION_JSON; |
| |
| Future<Response> response = process::http::post( |
| slave.get()->pid, |
| "api/v1/executor", |
| headers, |
| stringify(object), |
| APPLICATION_JSON); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response); |
| } |
| |
| |
| // This test sends a malformed body that cannot be deserialized |
| // into a valid protobuf resulting in a BadRequest. |
| TEST_P(ExecutorHttpApiTest, MalformedContent) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(__recover); |
| |
| // Wait for recovery to be complete. |
| Clock::pause(); |
| Clock::settle(); |
| |
| const string body = "MALFORMED_CONTENT"; |
| |
| const ContentType contentType = GetParam(); |
| process::http::Headers headers; |
| headers["Accept"] = stringify(contentType); |
| |
| Future<Response> response = process::http::post( |
| slave.get()->pid, |
| "api/v1/executor", |
| headers, |
| body, |
| stringify(contentType)); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response); |
| } |
| |
| |
| // This test sets an unsupported media type as Content-Type. This |
| // should result in a 415 (UnsupportedMediaType) response. |
| TEST_P(ExecutorHttpApiTest, UnsupportedContentMediaType) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(__recover); |
| |
| // Wait for recovery to be complete. |
| Clock::pause(); |
| Clock::settle(); |
| |
| ContentType contentType = GetParam(); |
| process::http::Headers headers; |
| headers["Accept"] = stringify(contentType); |
| |
| Call call; |
| call.mutable_framework_id()->set_value("dummy_framework_id"); |
| call.mutable_executor_id()->set_value("dummy_executor_id"); |
| call.set_type(Call::SUBSCRIBE); |
| |
| call.mutable_subscribe(); |
| |
| const string unknownMediaType = "application/unknown-media-type"; |
| |
| Future<Response> response = process::http::post( |
| slave.get()->pid, |
| "api/v1/executor", |
| headers, |
| serialize(contentType, call), |
| unknownMediaType); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(UnsupportedMediaType().status, response); |
| } |
| |
| |
| // This test sends a Call from an unknown FrameworkID. The call |
| // should return a BadRequest. |
| TEST_P(ExecutorHttpApiTest, MessageFromUnknownFramework) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(__recover); |
| |
| // Wait for recovery to be complete. |
| Clock::pause(); |
| Clock::settle(); |
| |
| ContentType contentType = GetParam(); |
| process::http::Headers headers; |
| headers["Accept"] = stringify(contentType); |
| |
| Call call; |
| call.mutable_framework_id()->set_value("dummy_framework_id"); |
| call.mutable_executor_id()->set_value("dummy_executor_id"); |
| call.set_type(Call::MESSAGE); |
| |
| call.mutable_message()->set_data("hello world"); |
| |
| Future<Response> response = process::http::post( |
| slave.get()->pid, |
| "api/v1/executor", |
| headers, |
| serialize(contentType, call), |
| stringify(contentType)); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response); |
| } |
| |
| |
| // This test sends a GET request to the executor HTTP endpoint instead |
| // of a POST. The call should return a MethodNotAllowed response. |
| TEST_F(ExecutorHttpApiTest, GetRequest) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(__recover); |
| |
| // Wait for recovery to be complete. |
| Clock::pause(); |
| Clock::settle(); |
| |
| Future<Response> response = process::http::get( |
| slave.get()->pid, |
| "api/v1/executor"); |
| |
| AWAIT_READY(response); |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(MethodNotAllowed({"POST"}).status, response); |
| } |
| |
| |
| // This test sends in an Accept:*/* header meaning it would |
| // accept any media type in the response. We expect the |
| // default "application/json" media type. |
| TEST_P(ExecutorHttpApiTest, DefaultAccept) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| auto executor = std::make_shared<v1::MockHTTPExecutor>(); |
| |
| ExecutorID executorId = DEFAULT_EXECUTOR_ID; |
| TestContainerizer containerizer(executorId, executor); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave( |
| detector.get(), |
| &containerizer, |
| flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)); |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| AWAIT_READY(offers); |
| |
| ASSERT_EQ(1u, offers->size()); |
| |
| Future<v1::executor::Mesos*> executorLib; |
| EXPECT_CALL(*executor, connected(_)) |
| .WillOnce(FutureArg<0>(&executorLib)); |
| |
| TaskInfo taskInfo = createTask(offers.get()[0], "", executorId); |
| driver.launchTasks(offers.get()[0].id(), {taskInfo}); |
| |
| // Wait for the executor to be launched before sending |
| // an executor subscribe request. |
| AWAIT_READY(executorLib); |
| |
| // Only subscribe needs to 'Accept' JSON or protobuf. |
| Call call; |
| call.mutable_framework_id()->CopyFrom(evolve(frameworkId.get())); |
| call.mutable_executor_id()->CopyFrom(evolve(executorId)); |
| |
| call.set_type(Call::SUBSCRIBE); |
| |
| call.mutable_subscribe(); |
| |
| // Retrieve the parameter passed as content type to this test. |
| const ContentType contentType = GetParam(); |
| |
| process::http::Headers headers; |
| headers["Accept"] = "*/*"; |
| |
| Future<Response> response = process::http::streaming::post( |
| slave.get()->pid, |
| "api/v1/executor", |
| headers, |
| serialize(contentType, call), |
| stringify(contentType)); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); |
| AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // This test does not set any Accept header for the subscribe call. |
| // The default response media type should be "application/json" in |
| // this case. |
| TEST_P(ExecutorHttpApiTest, NoAcceptHeader) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| auto executor = std::make_shared<v1::MockHTTPExecutor>(); |
| |
| ExecutorID executorId = DEFAULT_EXECUTOR_ID; |
| TestContainerizer containerizer(executorId, executor); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave( |
| detector.get(), |
| &containerizer, |
| flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)); |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| AWAIT_READY(offers); |
| |
| ASSERT_EQ(1u, offers->size()); |
| |
| Future<v1::executor::Mesos*> executorLib; |
| EXPECT_CALL(*executor, connected(_)) |
| .WillOnce(FutureArg<0>(&executorLib)); |
| |
| TaskInfo taskInfo = createTask(offers.get()[0], "", executorId); |
| driver.launchTasks(offers.get()[0].id(), {taskInfo}); |
| |
| // Wait for the executor to be launched before sending |
| // an executor subscribe request. |
| AWAIT_READY(executorLib); |
| |
| // Only subscribe needs to 'Accept' JSON or protobuf. |
| Call call; |
| call.mutable_framework_id()->CopyFrom(evolve(frameworkId.get())); |
| call.mutable_executor_id()->CopyFrom(evolve(executorId)); |
| |
| call.set_type(Call::SUBSCRIBE); |
| |
| call.mutable_subscribe(); |
| |
| // Retrieve the parameter passed as content type to this test. |
| const ContentType contentType = GetParam(); |
| |
| // No 'Accept' header leads to all media types considered |
| // acceptable. JSON will be chosen by default. |
| process::http::Headers headers; |
| |
| Future<Response> response = process::http::streaming::post( |
| slave.get()->pid, |
| "api/v1/executor", |
| headers, |
| serialize(contentType, call), |
| stringify(contentType)); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); |
| AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| |
| // This test sends a unsupported Accept media type for the Accept |
| // header. The response should be NotAcceptable in this case. |
| TEST_P(ExecutorHttpApiTest, NotAcceptable) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(__recover); |
| |
| // Wait for recovery to be complete. |
| Clock::pause(); |
| Clock::settle(); |
| |
| // Retrieve the parameter passed as content type to this test. |
| const ContentType contentType = GetParam(); |
| |
| process::http::Headers headers; |
| headers["Accept"] = "foo"; |
| |
| // Only subscribe needs to 'Accept' JSON or protobuf. |
| Call call; |
| call.mutable_framework_id()->set_value("dummy_framework_id"); |
| call.mutable_executor_id()->set_value("dummy_executor_id"); |
| |
| call.set_type(Call::SUBSCRIBE); |
| |
| call.mutable_subscribe(); |
| |
| Future<Response> response = process::http::streaming::post( |
| slave.get()->pid, |
| "api/v1/executor", |
| headers, |
| serialize(contentType, call), |
| stringify(contentType)); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(NotAcceptable().status, response); |
| } |
| |
| |
| TEST_P(ExecutorHttpApiTest, ValidProtobufInvalidCall) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(__recover); |
| |
| // Wait for recovery to be complete. |
| Clock::pause(); |
| Clock::settle(); |
| |
| // We send a Call protobuf message with missing |
| // required message per type. |
| { |
| Call call; |
| call.set_type(Call::SUBSCRIBE); |
| call.mutable_framework_id()->set_value("dummy_framework_id"); |
| call.mutable_executor_id()->set_value("dummy_executor_id"); |
| |
| process::http::Headers headers; |
| headers["Accept"] = APPLICATION_JSON; |
| |
| Future<Response> response = process::http::post( |
| slave.get()->pid, |
| "api/v1/executor", |
| headers, |
| serialize(ContentType::PROTOBUF, call), |
| APPLICATION_PROTOBUF); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response); |
| } |
| |
| { |
| Call call; |
| call.set_type(Call::UPDATE); |
| call.mutable_framework_id()->set_value("dummy_framework_id"); |
| call.mutable_executor_id()->set_value("dummy_executor_id"); |
| |
| process::http::Headers headers; |
| headers["Accept"] = APPLICATION_JSON; |
| |
| Future<Response> response = process::http::post( |
| slave.get()->pid, |
| "api/v1/executor", |
| headers, |
| serialize(ContentType::PROTOBUF, call), |
| APPLICATION_PROTOBUF); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response); |
| } |
| |
| { |
| Call call; |
| call.set_type(Call::MESSAGE); |
| call.mutable_framework_id()->set_value("dummy_framework_id"); |
| call.mutable_executor_id()->set_value("dummy_executor_id"); |
| |
| process::http::Headers headers; |
| headers["Accept"] = APPLICATION_JSON; |
| |
| Future<Response> response = process::http::post( |
| slave.get()->pid, |
| "api/v1/executor", |
| headers, |
| serialize(ContentType::PROTOBUF, call), |
| APPLICATION_PROTOBUF); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response); |
| } |
| } |
| |
| |
| TEST_P(ExecutorHttpApiTest, StatusUpdateCallFailedValidation) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); |
| ASSERT_SOME(slave); |
| |
| AWAIT_READY(__recover); |
| |
| // Wait for recovery to be complete. |
| Clock::pause(); |
| Clock::settle(); |
| |
| // We send a Call::Update message with inconsistent executor id between |
| // Call::executor_id and Call::Update::TaskInfo::executor_id. |
| // This should result in failed validation. |
| { |
| Call call; |
| call.set_type(Call::UPDATE); |
| call.mutable_framework_id()->set_value("dummy_framework_id"); |
| call.mutable_executor_id()->set_value("call_level_executor_id"); |
| |
| v1::TaskStatus* status = call.mutable_update()->mutable_status(); |
| |
| status->mutable_executor_id()->set_value("update_level_executor_id"); |
| status->set_state(mesos::v1::TaskState::TASK_STARTING); |
| status->mutable_task_id()->set_value("dummy_task_id"); |
| |
| process::http::Headers headers; |
| headers["Accept"] = APPLICATION_JSON; |
| |
| Future<Response> response = process::http::post( |
| slave.get()->pid, |
| "api/v1/executor", |
| headers, |
| serialize(ContentType::PROTOBUF, call), |
| APPLICATION_PROTOBUF); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response); |
| } |
| |
| // We send a Call Update message with an invalid UUID. |
| // This should result in failed validation. |
| { |
| Call call; |
| call.set_type(Call::UPDATE); |
| call.mutable_framework_id()->set_value("dummy_framework_id"); |
| call.mutable_executor_id()->set_value("call_level_executor_id"); |
| |
| v1::TaskStatus* status = call.mutable_update()->mutable_status(); |
| |
| status->set_uuid("dummy_uuid"); |
| status->mutable_task_id()->set_value("dummy_task_id"); |
| status->set_state(mesos::v1::TaskState::TASK_STARTING); |
| status->set_source(mesos::v1::TaskStatus::SOURCE_EXECUTOR); |
| |
| process::http::Headers headers; |
| headers["Accept"] = APPLICATION_JSON; |
| |
| Future<Response> response = process::http::post( |
| slave.get()->pid, |
| "api/v1/executor", |
| headers, |
| serialize(ContentType::PROTOBUF, call), |
| APPLICATION_PROTOBUF); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response); |
| } |
| |
| // We send a Call Update message with a TASK_STAGING |
| // status update. This should fail validation. |
| { |
| Call call; |
| call.set_type(Call::UPDATE); |
| call.mutable_framework_id()->set_value("dummy_framework_id"); |
| call.mutable_executor_id()->set_value("call_level_executor_id"); |
| |
| v1::TaskStatus* status = call.mutable_update()->mutable_status(); |
| |
| status->mutable_executor_id()->set_value("call_level_executor_id"); |
| status->mutable_task_id()->set_value("dummy_task_id"); |
| status->set_state(mesos::v1::TaskState::TASK_STAGING); |
| |
| process::http::Headers headers; |
| headers["Accept"] = APPLICATION_JSON; |
| |
| Future<Response> responseStatusUpdate = process::http::post( |
| slave.get()->pid, |
| "api/v1/executor", |
| headers, |
| serialize(ContentType::PROTOBUF, call), |
| APPLICATION_PROTOBUF); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, responseStatusUpdate); |
| } |
| |
| // We send a Call Update message with a different source than |
| // SOURCE_EXECUTOR in the status update. This should fail validation. |
| { |
| Call call; |
| call.set_type(Call::UPDATE); |
| call.mutable_framework_id()->set_value("dummy_framework_id"); |
| call.mutable_executor_id()->set_value("call_level_executor_id"); |
| |
| v1::TaskStatus* status = call.mutable_update()->mutable_status(); |
| |
| status->mutable_executor_id()->set_value("call_level_executor_id"); |
| status->mutable_task_id()->set_value("dummy_task_id"); |
| status->set_state(mesos::v1::TaskState::TASK_STARTING); |
| status->set_source(mesos::v1::TaskStatus::SOURCE_MASTER); |
| |
| process::http::Headers headers; |
| headers["Accept"] = APPLICATION_JSON; |
| |
| Future<Response> responseStatusUpdate = process::http::post( |
| slave.get()->pid, |
| "api/v1/executor", |
| headers, |
| serialize(ContentType::PROTOBUF, call), |
| APPLICATION_PROTOBUF); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, responseStatusUpdate); |
| } |
| } |
| |
| |
| // This test verifies that the executor cannot subscribe with the agent |
| // before it recovers the containerizer. |
| TEST_F(ExecutorHttpApiTest, SubscribeBeforeContainerizerRecovery) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| MockContainerizer mockContainerizer; |
| StandaloneMasterDetector detector; |
| |
| Future<Nothing> recover = FUTURE_DISPATCH(_, &Slave::recover); |
| |
| Promise<Nothing> recoveryPromise; |
| EXPECT_CALL(mockContainerizer, recover(_)) |
| .WillOnce(Return(recoveryPromise.future())); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave( |
| &detector, |
| &mockContainerizer, |
| flags); |
| ASSERT_SOME(slave); |
| |
| // Ensure that the agent has atleast set up HTTP routes upon startup. |
| AWAIT_READY(recover); |
| |
| // Send a subscribe call. This should fail with a '503 Service Unavailable' |
| // since the agent hasn't finished recovering the containerizer. |
| |
| Call call; |
| call.mutable_framework_id()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO.id()); |
| call.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); |
| |
| call.set_type(Call::SUBSCRIBE); |
| |
| call.mutable_subscribe(); |
| |
| process::http::Headers headers; |
| headers["Accept"] = APPLICATION_JSON; |
| |
| Future<Response> response = process::http::post( |
| slave.get()->pid, |
| "api/v1/executor", |
| headers, |
| serialize(ContentType::JSON, call), |
| APPLICATION_JSON); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(ServiceUnavailable().status, response); |
| |
| // The destructor of `cluster::Slave` will try to clean up any |
| // remaining containers by inspecting the result of `containers()`. |
| EXPECT_CALL(mockContainerizer, containers()) |
| .WillRepeatedly(Return(hashset<ContainerID>())); |
| } |
| |
| |
| // This test verifies if the executor is able to receive a Subscribed |
| // event in response to a Subscribe call request. |
| TEST_P(ExecutorHttpApiTest, Subscribe) |
| { |
| Try<Owned<cluster::Master>> master = StartMaster(); |
| ASSERT_SOME(master); |
| |
| ExecutorID executorId = DEFAULT_EXECUTOR_ID; |
| MockExecutor exec(executorId); |
| TestContainerizer containerizer(&exec); |
| |
| Owned<MasterDetector> detector = master.get()->createDetector(); |
| |
| slave::Flags flags = CreateSlaveFlags(); |
| |
| Try<Owned<cluster::Slave>> slave = StartSlave( |
| detector.get(), |
| &containerizer, |
| flags); |
| ASSERT_SOME(slave); |
| |
| MockScheduler sched; |
| MesosSchedulerDriver driver( |
| &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); |
| |
| Future<FrameworkID> frameworkId; |
| EXPECT_CALL(sched, registered(&driver, _, _)) |
| .WillOnce(FutureArg<1>(&frameworkId)); |
| |
| Future<vector<Offer>> offers; |
| EXPECT_CALL(sched, resourceOffers(&driver, _)) |
| .WillOnce(FutureArg<1>(&offers)); |
| |
| driver.start(); |
| |
| AWAIT_READY(frameworkId); |
| AWAIT_READY(offers); |
| |
| ASSERT_EQ(1u, offers->size()); |
| |
| Future<Message> registerExecutorMessage = |
| DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _); |
| |
| TaskInfo taskInfo = createTask(offers.get()[0], "", executorId); |
| driver.launchTasks(offers.get()[0].id(), {taskInfo}); |
| |
| // Drop the `RegisterExecutorMessage` and then send a `Subscribe` request |
| // from the HTTP based executor. |
| AWAIT_READY(registerExecutorMessage); |
| |
| Call call; |
| call.mutable_framework_id()->CopyFrom(evolve(frameworkId.get())); |
| call.mutable_executor_id()->CopyFrom(evolve(executorId)); |
| |
| call.set_type(Call::SUBSCRIBE); |
| |
| call.mutable_subscribe(); |
| |
| // Retrieve the parameter passed as content type to this test. |
| const ContentType contentType = GetParam(); |
| const string contentTypeString = stringify(contentType); |
| |
| process::http::Headers headers; |
| headers["Accept"] = contentTypeString; |
| |
| Future<Response> response = process::http::streaming::post( |
| slave.get()->pid, |
| "api/v1/executor", |
| headers, |
| serialize(contentType, call), |
| contentTypeString); |
| |
| AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); |
| AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response); |
| AWAIT_EXPECT_RESPONSE_HEADER_EQ(contentTypeString, "Content-Type", response); |
| |
| ASSERT_EQ(Response::PIPE, response->type); |
| |
| Option<Pipe::Reader> reader = response->reader; |
| ASSERT_SOME(reader); |
| |
| auto deserializer = |
| lambda::bind(deserialize<Event>, contentType, lambda::_1); |
| |
| Reader<Event> responseDecoder( |
| Decoder<Event>(deserializer), |
| reader.get()); |
| |
| Future<Result<Event>> event = responseDecoder.read(); |
| AWAIT_READY(event); |
| ASSERT_SOME(event.get()); |
| |
| // Check event type is subscribed and if the ExecutorID matches. |
| ASSERT_EQ(Event::SUBSCRIBED, event->get().type()); |
| ASSERT_EQ(event->get().subscribed().executor_info().executor_id(), |
| call.executor_id()); |
| ASSERT_TRUE(event->get().subscribed().has_container_id()); |
| |
| reader->close(); |
| |
| driver.stop(); |
| driver.join(); |
| } |
| |
| } // namespace tests { |
| } // namespace internal { |
| } // namespace mesos { |