blob: d111547026271766f1d632e73126c432a571033a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <string>
#include <vector>
#include <mesos/v1/executor/executor.hpp>
#include <mesos/http.hpp>
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gtest.hpp>
#include <process/http.hpp>
#include <process/message.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include "common/http.hpp"
#include "common/recordio.hpp"
#include "master/master.hpp"
#include "master/detector/standalone.hpp"
#include "tests/containerizer.hpp"
#include "tests/mesos.hpp"
#include "tests/containerizer/mock_containerizer.hpp"
using mesos::internal::master::Master;
using mesos::internal::recordio::Reader;
using mesos::internal::slave::Slave;
using mesos::master::detector::MasterDetector;
using mesos::master::detector::StandaloneMasterDetector;
using mesos::v1::executor::Call;
using mesos::v1::executor::Event;
using process::Clock;
using process::Future;
using process::Message;
using process::Owned;
using process::PID;
using process::Promise;
using process::http::BadRequest;
using process::http::MethodNotAllowed;
using process::http::NotAcceptable;
using process::http::OK;
using process::http::Pipe;
using process::http::Response;
using process::http::ServiceUnavailable;
using process::http::UnsupportedMediaType;
using recordio::Decoder;
using std::string;
using std::vector;
using testing::AtMost;
using testing::DoAll;
using testing::Eq;
using testing::WithParamInterface;
namespace mesos {
namespace v1 {
namespace executor {
// Forward defined constant found in `executor/executor.cpp`.
// TODO(josephw): Remove this when this constant is moved into a header.
extern const Duration DEFAULT_HEARTBEAT_CALL_INTERVAL;
} // namespace executor {
} // namespace v1 {
namespace internal {
namespace tests {
class ExecutorHttpApiTest
: public MesosTest,
public WithParamInterface<ContentType>
{
protected:
slave::Flags CreateSlaveFlags() override
{
slave::Flags flags = MesosTest::CreateSlaveFlags();
#ifdef USE_SSL_SOCKET
// Disable executor authentication on the agent. Executor authentication
// currently has SSL as a dependency, so this is only necessary if Mesos was
// built with SSL support.
flags.authenticate_http_executors = false;
#endif // USE_SSL_SOCKET
return flags;
}
};
// The tests are parameterized by the content type of the request.
INSTANTIATE_TEST_CASE_P(
ContentType,
ExecutorHttpApiTest,
::testing::Values(ContentType::PROTOBUF, ContentType::JSON));
// TODO(anand): Add more validation tests for:
// - If the slave is still recovering, the call should return
// ServiceUnavailable.
// - If Executor is not found, the call should return
// BadRequest.
// - If Executor has not registered and sends a Call message other
// than Subscribe, the call should return Forbidden.
// This test expects a BadRequest when 'Content-Type' is omitted.
TEST_F(ExecutorHttpApiTest, NoContentType)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
Owned<MasterDetector> detector = master.get()->createDetector();
slave::Flags flags = CreateSlaveFlags();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait for recovery to be complete.
Clock::pause();
Clock::settle();
Call call;
call.mutable_framework_id()->set_value("dummy_framework_id");
call.mutable_executor_id()->set_value("dummy_executor_id");
call.set_type(Call::MESSAGE);
call.mutable_message()->set_data("hello world");
Future<Response> response = process::http::post(
slave.get()->pid,
"api/v1/executor",
None(),
serialize(ContentType::JSON, call),
None());
AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
}
// This test sends a valid JSON blob that cannot be deserialized
// into a valid protobuf resulting in a BadRequest.
TEST_F(ExecutorHttpApiTest, ValidJsonButInvalidProtobuf)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
Owned<MasterDetector> detector = master.get()->createDetector();
slave::Flags flags = CreateSlaveFlags();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait for recovery to be complete.
Clock::pause();
Clock::settle();
JSON::Object object;
object.values["string"] = "valid_json";
process::http::Headers headers;
headers["Accept"] = APPLICATION_JSON;
Future<Response> response = process::http::post(
slave.get()->pid,
"api/v1/executor",
headers,
stringify(object),
APPLICATION_JSON);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
}
// This test sends a malformed body that cannot be deserialized
// into a valid protobuf resulting in a BadRequest.
TEST_P(ExecutorHttpApiTest, MalformedContent)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
Owned<MasterDetector> detector = master.get()->createDetector();
slave::Flags flags = CreateSlaveFlags();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait for recovery to be complete.
Clock::pause();
Clock::settle();
const string body = "MALFORMED_CONTENT";
const ContentType contentType = GetParam();
process::http::Headers headers;
headers["Accept"] = stringify(contentType);
Future<Response> response = process::http::post(
slave.get()->pid,
"api/v1/executor",
headers,
body,
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
}
// This test sets an unsupported media type as Content-Type. This
// should result in a 415 (UnsupportedMediaType) response.
TEST_P(ExecutorHttpApiTest, UnsupportedContentMediaType)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
Owned<MasterDetector> detector = master.get()->createDetector();
slave::Flags flags = CreateSlaveFlags();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait for recovery to be complete.
Clock::pause();
Clock::settle();
ContentType contentType = GetParam();
process::http::Headers headers;
headers["Accept"] = stringify(contentType);
Call call;
call.mutable_framework_id()->set_value("dummy_framework_id");
call.mutable_executor_id()->set_value("dummy_executor_id");
call.set_type(Call::SUBSCRIBE);
call.mutable_subscribe();
const string unknownMediaType = "application/unknown-media-type";
Future<Response> response = process::http::post(
slave.get()->pid,
"api/v1/executor",
headers,
serialize(contentType, call),
unknownMediaType);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(UnsupportedMediaType().status, response);
}
// This test sends a Call from an unknown FrameworkID. The call
// should return a BadRequest.
TEST_P(ExecutorHttpApiTest, MessageFromUnknownFramework)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
Owned<MasterDetector> detector = master.get()->createDetector();
slave::Flags flags = CreateSlaveFlags();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait for recovery to be complete.
Clock::pause();
Clock::settle();
ContentType contentType = GetParam();
process::http::Headers headers;
headers["Accept"] = stringify(contentType);
Call call;
call.mutable_framework_id()->set_value("dummy_framework_id");
call.mutable_executor_id()->set_value("dummy_executor_id");
call.set_type(Call::MESSAGE);
call.mutable_message()->set_data("hello world");
Future<Response> response = process::http::post(
slave.get()->pid,
"api/v1/executor",
headers,
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
}
// This test sends a GET request to the executor HTTP endpoint instead
// of a POST. The call should return a MethodNotAllowed response.
TEST_F(ExecutorHttpApiTest, GetRequest)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
Owned<MasterDetector> detector = master.get()->createDetector();
slave::Flags flags = CreateSlaveFlags();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait for recovery to be complete.
Clock::pause();
Clock::settle();
Future<Response> response = process::http::get(
slave.get()->pid,
"api/v1/executor");
AWAIT_READY(response);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(MethodNotAllowed({"POST"}).status, response);
}
// This test sends in an Accept:*/* header meaning it would
// accept any media type in the response. We expect the
// default "application/json" media type.
TEST_P(ExecutorHttpApiTest, DefaultAccept)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto executor = std::make_shared<v1::MockHTTPExecutor>();
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
TestContainerizer containerizer(executorId, executor);
Owned<MasterDetector> detector = master.get()->createDetector();
slave::Flags flags = CreateSlaveFlags();
Try<Owned<cluster::Slave>> slave = StartSlave(
detector.get(),
&containerizer,
flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Future<v1::executor::Mesos*> executorLib;
EXPECT_CALL(*executor, connected(_))
.WillOnce(FutureArg<0>(&executorLib));
TaskInfo taskInfo = createTask(offers.get()[0], "", executorId);
driver.launchTasks(offers.get()[0].id(), {taskInfo});
// Wait for the executor to be launched before sending
// an executor subscribe request.
AWAIT_READY(executorLib);
// Only subscribe needs to 'Accept' JSON or protobuf.
Call call;
call.mutable_framework_id()->CopyFrom(evolve(frameworkId.get()));
call.mutable_executor_id()->CopyFrom(evolve(executorId));
call.set_type(Call::SUBSCRIBE);
call.mutable_subscribe();
// Retrieve the parameter passed as content type to this test.
const ContentType contentType = GetParam();
process::http::Headers headers;
headers["Accept"] = "*/*";
Future<Response> response = process::http::streaming::post(
slave.get()->pid,
"api/v1/executor",
headers,
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
driver.stop();
driver.join();
}
// This test does not set any Accept header for the subscribe call.
// The default response media type should be "application/json" in
// this case.
TEST_P(ExecutorHttpApiTest, NoAcceptHeader)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
auto executor = std::make_shared<v1::MockHTTPExecutor>();
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
TestContainerizer containerizer(executorId, executor);
Owned<MasterDetector> detector = master.get()->createDetector();
slave::Flags flags = CreateSlaveFlags();
Try<Owned<cluster::Slave>> slave = StartSlave(
detector.get(),
&containerizer,
flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Future<v1::executor::Mesos*> executorLib;
EXPECT_CALL(*executor, connected(_))
.WillOnce(FutureArg<0>(&executorLib));
TaskInfo taskInfo = createTask(offers.get()[0], "", executorId);
driver.launchTasks(offers.get()[0].id(), {taskInfo});
// Wait for the executor to be launched before sending
// an executor subscribe request.
AWAIT_READY(executorLib);
// Only subscribe needs to 'Accept' JSON or protobuf.
Call call;
call.mutable_framework_id()->CopyFrom(evolve(frameworkId.get()));
call.mutable_executor_id()->CopyFrom(evolve(executorId));
call.set_type(Call::SUBSCRIBE);
call.mutable_subscribe();
// Retrieve the parameter passed as content type to this test.
const ContentType contentType = GetParam();
// No 'Accept' header leads to all media types considered
// acceptable. JSON will be chosen by default.
process::http::Headers headers;
Future<Response> response = process::http::streaming::post(
slave.get()->pid,
"api/v1/executor",
headers,
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
driver.stop();
driver.join();
}
// This test sends a unsupported Accept media type for the Accept
// header. The response should be NotAcceptable in this case.
TEST_P(ExecutorHttpApiTest, NotAcceptable)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
Owned<MasterDetector> detector = master.get()->createDetector();
slave::Flags flags = CreateSlaveFlags();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait for recovery to be complete.
Clock::pause();
Clock::settle();
// Retrieve the parameter passed as content type to this test.
const ContentType contentType = GetParam();
process::http::Headers headers;
headers["Accept"] = "foo";
// Only subscribe needs to 'Accept' JSON or protobuf.
Call call;
call.mutable_framework_id()->set_value("dummy_framework_id");
call.mutable_executor_id()->set_value("dummy_executor_id");
call.set_type(Call::SUBSCRIBE);
call.mutable_subscribe();
Future<Response> response = process::http::streaming::post(
slave.get()->pid,
"api/v1/executor",
headers,
serialize(contentType, call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(NotAcceptable().status, response);
}
TEST_P(ExecutorHttpApiTest, ValidProtobufInvalidCall)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
Owned<MasterDetector> detector = master.get()->createDetector();
slave::Flags flags = CreateSlaveFlags();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait for recovery to be complete.
Clock::pause();
Clock::settle();
// We send a Call protobuf message with missing
// required message per type.
{
Call call;
call.set_type(Call::SUBSCRIBE);
call.mutable_framework_id()->set_value("dummy_framework_id");
call.mutable_executor_id()->set_value("dummy_executor_id");
process::http::Headers headers;
headers["Accept"] = APPLICATION_JSON;
Future<Response> response = process::http::post(
slave.get()->pid,
"api/v1/executor",
headers,
serialize(ContentType::PROTOBUF, call),
APPLICATION_PROTOBUF);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
}
{
Call call;
call.set_type(Call::UPDATE);
call.mutable_framework_id()->set_value("dummy_framework_id");
call.mutable_executor_id()->set_value("dummy_executor_id");
process::http::Headers headers;
headers["Accept"] = APPLICATION_JSON;
Future<Response> response = process::http::post(
slave.get()->pid,
"api/v1/executor",
headers,
serialize(ContentType::PROTOBUF, call),
APPLICATION_PROTOBUF);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
}
{
Call call;
call.set_type(Call::MESSAGE);
call.mutable_framework_id()->set_value("dummy_framework_id");
call.mutable_executor_id()->set_value("dummy_executor_id");
process::http::Headers headers;
headers["Accept"] = APPLICATION_JSON;
Future<Response> response = process::http::post(
slave.get()->pid,
"api/v1/executor",
headers,
serialize(ContentType::PROTOBUF, call),
APPLICATION_PROTOBUF);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
}
}
TEST_P(ExecutorHttpApiTest, StatusUpdateCallFailedValidation)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
Owned<MasterDetector> detector = master.get()->createDetector();
slave::Flags flags = CreateSlaveFlags();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
AWAIT_READY(__recover);
// Wait for recovery to be complete.
Clock::pause();
Clock::settle();
// We send a Call::Update message with inconsistent executor id between
// Call::executor_id and Call::Update::TaskInfo::executor_id.
// This should result in failed validation.
{
Call call;
call.set_type(Call::UPDATE);
call.mutable_framework_id()->set_value("dummy_framework_id");
call.mutable_executor_id()->set_value("call_level_executor_id");
v1::TaskStatus* status = call.mutable_update()->mutable_status();
status->mutable_executor_id()->set_value("update_level_executor_id");
status->set_state(mesos::v1::TaskState::TASK_STARTING);
status->mutable_task_id()->set_value("dummy_task_id");
process::http::Headers headers;
headers["Accept"] = APPLICATION_JSON;
Future<Response> response = process::http::post(
slave.get()->pid,
"api/v1/executor",
headers,
serialize(ContentType::PROTOBUF, call),
APPLICATION_PROTOBUF);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
}
// We send a Call Update message with an invalid UUID.
// This should result in failed validation.
{
Call call;
call.set_type(Call::UPDATE);
call.mutable_framework_id()->set_value("dummy_framework_id");
call.mutable_executor_id()->set_value("call_level_executor_id");
v1::TaskStatus* status = call.mutable_update()->mutable_status();
status->set_uuid("dummy_uuid");
status->mutable_task_id()->set_value("dummy_task_id");
status->set_state(mesos::v1::TaskState::TASK_STARTING);
status->set_source(mesos::v1::TaskStatus::SOURCE_EXECUTOR);
process::http::Headers headers;
headers["Accept"] = APPLICATION_JSON;
Future<Response> response = process::http::post(
slave.get()->pid,
"api/v1/executor",
headers,
serialize(ContentType::PROTOBUF, call),
APPLICATION_PROTOBUF);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
}
// We send a Call Update message with a TASK_STAGING
// status update. This should fail validation.
{
Call call;
call.set_type(Call::UPDATE);
call.mutable_framework_id()->set_value("dummy_framework_id");
call.mutable_executor_id()->set_value("call_level_executor_id");
v1::TaskStatus* status = call.mutable_update()->mutable_status();
status->mutable_executor_id()->set_value("call_level_executor_id");
status->mutable_task_id()->set_value("dummy_task_id");
status->set_state(mesos::v1::TaskState::TASK_STAGING);
process::http::Headers headers;
headers["Accept"] = APPLICATION_JSON;
Future<Response> responseStatusUpdate = process::http::post(
slave.get()->pid,
"api/v1/executor",
headers,
serialize(ContentType::PROTOBUF, call),
APPLICATION_PROTOBUF);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, responseStatusUpdate);
}
// We send a Call Update message with a different source than
// SOURCE_EXECUTOR in the status update. This should fail validation.
{
Call call;
call.set_type(Call::UPDATE);
call.mutable_framework_id()->set_value("dummy_framework_id");
call.mutable_executor_id()->set_value("call_level_executor_id");
v1::TaskStatus* status = call.mutable_update()->mutable_status();
status->mutable_executor_id()->set_value("call_level_executor_id");
status->mutable_task_id()->set_value("dummy_task_id");
status->set_state(mesos::v1::TaskState::TASK_STARTING);
status->set_source(mesos::v1::TaskStatus::SOURCE_MASTER);
process::http::Headers headers;
headers["Accept"] = APPLICATION_JSON;
Future<Response> responseStatusUpdate = process::http::post(
slave.get()->pid,
"api/v1/executor",
headers,
serialize(ContentType::PROTOBUF, call),
APPLICATION_PROTOBUF);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, responseStatusUpdate);
}
}
// This test verifies that the executor cannot subscribe with the agent
// before it recovers the containerizer.
TEST_F(ExecutorHttpApiTest, SubscribeBeforeContainerizerRecovery)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
MockContainerizer mockContainerizer;
StandaloneMasterDetector detector;
Future<Nothing> recover = FUTURE_DISPATCH(_, &Slave::recover);
Promise<Nothing> recoveryPromise;
EXPECT_CALL(mockContainerizer, recover(_))
.WillOnce(Return(recoveryPromise.future()));
slave::Flags flags = CreateSlaveFlags();
Try<Owned<cluster::Slave>> slave = StartSlave(
&detector,
&mockContainerizer,
flags);
ASSERT_SOME(slave);
// Ensure that the agent has atleast set up HTTP routes upon startup.
AWAIT_READY(recover);
// Send a subscribe call. This should fail with a '503 Service Unavailable'
// since the agent hasn't finished recovering the containerizer.
Call call;
call.mutable_framework_id()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO.id());
call.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
call.set_type(Call::SUBSCRIBE);
call.mutable_subscribe();
process::http::Headers headers;
headers["Accept"] = APPLICATION_JSON;
Future<Response> response = process::http::post(
slave.get()->pid,
"api/v1/executor",
headers,
serialize(ContentType::JSON, call),
APPLICATION_JSON);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(ServiceUnavailable().status, response);
// The destructor of `cluster::Slave` will try to clean up any
// remaining containers by inspecting the result of `containers()`.
EXPECT_CALL(mockContainerizer, containers())
.WillRepeatedly(Return(hashset<ContainerID>()));
}
// This test verifies if the executor is able to receive a Subscribed
// event in response to a Subscribe call request.
TEST_P(ExecutorHttpApiTest, Subscribe)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
MockExecutor exec(executorId);
TestContainerizer containerizer(&exec);
Owned<MasterDetector> detector = master.get()->createDetector();
slave::Flags flags = CreateSlaveFlags();
Try<Owned<cluster::Slave>> slave = StartSlave(
detector.get(),
&containerizer,
flags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
Future<Message> registerExecutorMessage =
DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
TaskInfo taskInfo = createTask(offers.get()[0], "", executorId);
driver.launchTasks(offers.get()[0].id(), {taskInfo});
// Drop the `RegisterExecutorMessage` and then send a `Subscribe` request
// from the HTTP based executor.
AWAIT_READY(registerExecutorMessage);
Call call;
call.mutable_framework_id()->CopyFrom(evolve(frameworkId.get()));
call.mutable_executor_id()->CopyFrom(evolve(executorId));
call.set_type(Call::SUBSCRIBE);
call.mutable_subscribe();
// Retrieve the parameter passed as content type to this test.
const ContentType contentType = GetParam();
const string contentTypeString = stringify(contentType);
process::http::Headers headers;
headers["Accept"] = contentTypeString;
Future<Response> response = process::http::streaming::post(
slave.get()->pid,
"api/v1/executor",
headers,
serialize(contentType, call),
contentTypeString);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
AWAIT_EXPECT_RESPONSE_HEADER_EQ(contentTypeString, "Content-Type", response);
ASSERT_EQ(Response::PIPE, response->type);
Option<Pipe::Reader> reader = response->reader;
ASSERT_SOME(reader);
Reader<Event> responseDecoder(
lambda::bind(deserialize<Event>, contentType, lambda::_1),
reader.get());
Future<Result<Event>> event = responseDecoder.read();
AWAIT_READY(event);
ASSERT_SOME(event.get());
// Check event type is subscribed and if the ExecutorID matches.
ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
ASSERT_EQ(event->get().subscribed().executor_info().executor_id(),
call.executor_id());
ASSERT_TRUE(event->get().subscribed().has_container_id());
reader->close();
driver.stop();
driver.join();
}
// This test verifies that heartbeats are sent from the agent to the executor.
TEST_P(ExecutorHttpApiTest, HeartbeatEvents)
{
Clock::pause();
const ContentType contentType = GetParam();
const string contentTypeString = stringify(contentType);
Resources resources = Resources::parse("cpus:0.1;mem:32;disk:32").get();
master::Flags masterFlags = CreateMasterFlags();
Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
ExecutorID executorId = DEFAULT_EXECUTOR_ID;
MockExecutor exec(executorId);
TestContainerizer containerizer(&exec);
Owned<MasterDetector> detector = master.get()->createDetector();
slave::Flags slaveFlags = CreateSlaveFlags();
slaveFlags.registration_backoff_factor = Seconds(0);
Try<Owned<cluster::Slave>> slave = StartSlave(
detector.get(),
&containerizer,
slaveFlags);
ASSERT_SOME(slave);
MockScheduler sched;
MesosSchedulerDriver driver(
&sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureArg<1>(&frameworkId));
Future<vector<Offer>> offers;
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
driver.start();
AWAIT_READY(frameworkId);
AWAIT_READY(offers);
ASSERT_EQ(1u, offers->size());
// Drop the `RegisterExecutorMessage` so the test body can
// pretend to be the executor.
Future<Message> registerExecutorMessage =
DROP_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
TaskInfo taskInfo1 = createTask(
offers.get()[0].slave_id(), resources, "", executorId);
driver.launchTasks(offers.get()[0].id(), {taskInfo1});
AWAIT_READY(registerExecutorMessage);
// The test body will pretend to be the executor, which gives us
// straightforward access to any events that are sent to the executor.
Call call;
call.mutable_framework_id()->CopyFrom(evolve(frameworkId.get()));
call.mutable_executor_id()->CopyFrom(evolve(executorId));
call.set_type(Call::SUBSCRIBE);
call.mutable_subscribe();
process::http::Headers headers;
headers["Accept"] = contentTypeString;
Future<Response> response = process::http::streaming::post(
slave.get()->pid,
"api/v1/executor",
headers,
serialize(contentType, call),
contentTypeString);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
AWAIT_EXPECT_RESPONSE_HEADER_EQ(
contentTypeString, "Content-Type", response);
ASSERT_EQ(Response::PIPE, response->type);
Option<Pipe::Reader> reader = response->reader;
ASSERT_SOME(reader);
Reader<Event> responseDecoder(
lambda::bind(deserialize<Event>, contentType, lambda::_1),
reader.get());
Future<Result<Event>> event = responseDecoder.read();
AWAIT_READY(event);
ASSERT_SOME(event.get());
// Check event type is subscribed and if the ExecutorID matches.
ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
ASSERT_EQ(event->get().subscribed().executor_info().executor_id(),
call.executor_id());
ASSERT_TRUE(event->get().subscribed().has_container_id());
// Wait for the agent to send the first task, so we don't race against
// this later on. This (pretend) executor drops the task though.
event = responseDecoder.read();
AWAIT_READY(event);
ASSERT_SOME(event.get());
ASSERT_EQ(Event::LAUNCH, event->get().type());
// The next event should be the heartbeat.
event = responseDecoder.read();
ASSERT_TRUE(event.isPending());
Clock::advance(slave::DEFAULT_EXECUTOR_HEARTBEAT_INTERVAL);
AWAIT_READY(event);
ASSERT_SOME(event.get());
ASSERT_EQ(Event::HEARTBEAT, event->get().type());
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
driver.stop();
driver.join();
Clock::resume();
}
// Mock agent for intercepting messages from HTTP executors.
class BareBonesAgentProcess : public process::Process<BareBonesAgentProcess>
{
public:
BareBonesAgentProcess()
: process::ProcessBase(process::ID::generate("bare-bones-agent")) {}
MOCK_METHOD1(
executor,
Future<process::http::Response>(const process::http::Request&));
protected:
void initialize() override
{
route(
"/api/v1/executor",
None(),
&BareBonesAgentProcess::executor);
}
};
class BareBonesAgent
{
public:
BareBonesAgent() : process(new BareBonesAgentProcess())
{
process::spawn(process.get());
}
~BareBonesAgent()
{
process::terminate(process.get());
process::wait(process.get());
}
Owned<BareBonesAgentProcess> process;
};
// This test verifies that heartbeats are sent from the executor driver
// to the agent.
TEST_F(ExecutorHttpApiTest, HeartbeatCalls)
{
Clock::pause();
// Launch an HTTP server that pretends to be the agent in this test.
BareBonesAgent agent;
// Pre-fill a response pipe with a SUBSCRIBED response.
process::http::Pipe pipe;
{
v1::executor::Event event;
event.set_type(v1::executor::Event::SUBSCRIBED);
v1::ExecutorInfo* executorInfo =
event.mutable_subscribed()->mutable_executor_info();
executorInfo->set_type(v1::ExecutorInfo::DEFAULT);
executorInfo->mutable_executor_id()->set_value("empty");
v1::FrameworkInfo* frameworkInfo =
event.mutable_subscribed()->mutable_framework_info();
frameworkInfo->mutable_id()->set_value("fake");
frameworkInfo->set_user("whoever");
frameworkInfo->set_name("anonymous");
event.mutable_subscribed()->mutable_agent_info()->set_hostname(":P");
event.mutable_subscribed()->mutable_container_id()->set_value(":P");
pipe.writer().write(
::recordio::encode(serialize(ContentType::PROTOBUF, event)));
}
// Set the expectation for an executor to register with the fake agent.
process::http::Response subscribed = process::http::OK();
subscribed.type = process::http::Response::PIPE;
subscribed.reader = pipe.reader();
Future<process::http::Request> expectedSubscribe;
Promise<Nothing> eventSubscribed;
EXPECT_CALL(*agent.process, executor(_))
.WillOnce(DoAll(
FutureArg<0>(&expectedSubscribe),
Return(subscribed)));
// Start the executor driver.
// NOTE: We use an Owned pointer here because the lambdas passed to the
// executor driver's constructor reference the executor driver itself,
// which is not allowed on some compilers if the driver is stack allocated.
Owned<mesos::v1::executor::Mesos> executor;
// Since this test only cares about the calls sent to the agent,
// any events and callbacks fired by this driver are used purely for
// synchronization purposes.
executor.reset(new mesos::v1::executor::Mesos(
ContentType::PROTOBUF,
[&executor]() mutable {
v1::executor::Call v1Call;
v1Call.set_type(v1::executor::Call::SUBSCRIBE);
v1Call.mutable_executor_id()->set_value("empty");
v1Call.mutable_framework_id()->set_value("fake");
v1Call.mutable_subscribe();
executor->send(v1Call);
},
[]() {},
[&eventSubscribed](std::queue<v1::executor::Event> queue) {
while(!queue.empty()) {
const v1::executor::Event& event = queue.front();
if (event.type() == v1::executor::Event::SUBSCRIBED) {
eventSubscribed.set(Nothing());
}
queue.pop();
}
},
{
{ "MESOS_SLAVE_PID", stringify(agent.process->self()) },
{ "MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD",
stringify(slave::DEFAULT_EXECUTOR_SHUTDOWN_GRACE_PERIOD) }
}));
// Wait for the call to arrive at our fake agent.
AWAIT_READY(expectedSubscribe);
Option<string> contentType = expectedSubscribe->headers.get("Content-Type");
ASSERT_SOME(contentType);
ASSERT_EQ(APPLICATION_PROTOBUF, contentType.get());
{
v1::executor::Call v1Call;
ASSERT_TRUE(v1Call.ParseFromString(expectedSubscribe->body));
ASSERT_EQ(v1::executor::Call::SUBSCRIBE, v1Call.type());
}
// Wait for the executor to receive the response from our fake agent.
AWAIT_READY(eventSubscribed.future());
// Advance time and expect to get a heartbeat.
Future<process::http::Request> expectedHeartbeat;
EXPECT_CALL(*agent.process, executor(_))
.WillOnce(DoAll(
FutureArg<0>(&expectedHeartbeat),
Return(process::http::Accepted())));
Clock::advance(mesos::v1::executor::DEFAULT_HEARTBEAT_CALL_INTERVAL);
AWAIT_READY(expectedHeartbeat);
contentType = expectedHeartbeat->headers.get("Content-Type");
ASSERT_SOME(contentType);
ASSERT_EQ(APPLICATION_PROTOBUF, contentType.get());
{
v1::executor::Call v1Call;
ASSERT_TRUE(v1Call.ParseFromString(expectedHeartbeat->body));
ASSERT_EQ(v1::executor::Call::HEARTBEAT, v1Call.type());
}
Clock::resume();
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {